[GitHub] spark pull request #20726: [SPARK-23574][CORE] Report SinglePartition in Dat...

2018-03-05 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20726#discussion_r172403479
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java
 ---
@@ -23,6 +23,10 @@
 /**
  * A mix in interface for {@link DataSourceReader}. Data source readers 
can implement this
  * interface to report data partitioning and try to avoid shuffle at Spark 
side.
+ *
+ * Note that Spark will always infer a
+ * {@link org.apache.spark.sql.catalyst.plans.physical.SinglePartition} 
partitioning when the
+ * reader creates exactly 1 {@link DataReaderFactory}.
--- End diff --

nit:  no matter the reader implements this interface or not.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20726: [SPARK-23574][CORE] Report SinglePartition in Dat...

2018-03-05 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/20726#discussion_r172328296
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
 ---
@@ -46,34 +48,46 @@ case class DataSourceV2ScanExec(
   new DataSourcePartitioning(
 s.outputPartitioning(), AttributeMap(output.map(a => a -> a.name)))
 
+case _ if readerFactories.size == 1 => SinglePartition
+
 case _ => super.outputPartitioning
   }
 
-  private lazy val readerFactories: 
java.util.List[DataReaderFactory[UnsafeRow]] = reader match {
-case r: SupportsScanUnsafeRow => r.createUnsafeRowReaderFactories()
+  private lazy val readerFactories: Seq[DataReaderFactory[_]] = reader 
match {
--- End diff --

That's a good point. Updated the PR.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20726: [SPARK-23574][CORE] Report SinglePartition in Dat...

2018-03-05 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20726#discussion_r172324207
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
 ---
@@ -46,34 +48,46 @@ case class DataSourceV2ScanExec(
   new DataSourcePartitioning(
 s.outputPartitioning(), AttributeMap(output.map(a => a -> a.name)))
 
+case _ if readerFactories.size == 1 => SinglePartition
+
 case _ => super.outputPartitioning
   }
 
-  private lazy val readerFactories: 
java.util.List[DataReaderFactory[UnsafeRow]] = reader match {
-case r: SupportsScanUnsafeRow => r.createUnsafeRowReaderFactories()
+  private lazy val readerFactories: Seq[DataReaderFactory[_]] = reader 
match {
--- End diff --

I think it is better to have a lazy val throw an exception if it is called 
at the wrong time (if it is private) than to add the cast because the exception 
at least validates that assumptions are met and can throw a reasonable error 
message. The cast might hide the problem, particularly over time as this code 
evolves. It would be reasonable to add another path that returns 
`Seq[DataReaderFactory[_]]` because that's the method contract, even though 
there is an assumption in the callers about how it will behave. As much of the 
contract between methods as possible should be expressed in types.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20726: [SPARK-23574][CORE] Report SinglePartition in Dat...

2018-03-05 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/20726#discussion_r172322719
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
 ---
@@ -46,34 +48,46 @@ case class DataSourceV2ScanExec(
   new DataSourcePartitioning(
 s.outputPartitioning(), AttributeMap(output.map(a => a -> a.name)))
 
+case _ if readerFactories.size == 1 => SinglePartition
+
 case _ => super.outputPartitioning
   }
 
-  private lazy val readerFactories: 
java.util.List[DataReaderFactory[UnsafeRow]] = reader match {
-case r: SupportsScanUnsafeRow => r.createUnsafeRowReaderFactories()
+  private lazy val readerFactories: Seq[DataReaderFactory[_]] = reader 
match {
--- End diff --

We could, but then both usages of `readerFactories` (along with any future 
ones) would have to add a `case r: SupportsScanColumnarBatch if 
r.enableBatchRead()` check to figure out which val they're supposed to use. I'm 
not a huge fan of lazy vals that will throw errors if anyone tries to 
instantiate them.

Both ways seem equally unclean to me, so I'm fine with switching if you 
think separating them is better.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20726: [SPARK-23574][CORE] Report SinglePartition in Dat...

2018-03-05 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20726#discussion_r172320487
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
 ---
@@ -46,34 +48,46 @@ case class DataSourceV2ScanExec(
   new DataSourcePartitioning(
 s.outputPartitioning(), AttributeMap(output.map(a => a -> a.name)))
 
+case _ if readerFactories.size == 1 => SinglePartition
+
 case _ => super.outputPartitioning
   }
 
-  private lazy val readerFactories: 
java.util.List[DataReaderFactory[UnsafeRow]] = reader match {
-case r: SupportsScanUnsafeRow => r.createUnsafeRowReaderFactories()
+  private lazy val readerFactories: Seq[DataReaderFactory[_]] = reader 
match {
--- End diff --

Why not separate the method into `batchReaderFactories` and 
`readerFactories`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20726: [SPARK-23574][CORE] Report SinglePartition in Dat...

2018-03-05 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/20726#discussion_r172318033
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
 ---
@@ -46,34 +48,46 @@ case class DataSourceV2ScanExec(
   new DataSourcePartitioning(
 s.outputPartitioning(), AttributeMap(output.map(a => a -> a.name)))
 
+case _ if readerFactories.size == 1 => SinglePartition
+
 case _ => super.outputPartitioning
   }
 
-  private lazy val readerFactories: 
java.util.List[DataReaderFactory[UnsafeRow]] = reader match {
-case r: SupportsScanUnsafeRow => r.createUnsafeRowReaderFactories()
+  private lazy val readerFactories: Seq[DataReaderFactory[_]] = reader 
match {
--- End diff --

I agree that we should separate these cases, but I don't immediately see a 
good way to do so.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20726: [SPARK-23574][CORE] Report SinglePartition in Dat...

2018-03-05 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20726#discussion_r172317377
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
 ---
@@ -46,34 +48,46 @@ case class DataSourceV2ScanExec(
   new DataSourcePartitioning(
 s.outputPartitioning(), AttributeMap(output.map(a => a -> a.name)))
 
+case _ if readerFactories.size == 1 => SinglePartition
+
 case _ => super.outputPartitioning
   }
 
-  private lazy val readerFactories: 
java.util.List[DataReaderFactory[UnsafeRow]] = reader match {
-case r: SupportsScanUnsafeRow => r.createUnsafeRowReaderFactories()
+  private lazy val readerFactories: Seq[DataReaderFactory[_]] = reader 
match {
--- End diff --

Why not separate the cases for columnar batch and unsafe rows? That would 
avoid needing to cast this later to `Seq[DataReaderFactory[UnsafeRow]]` and 
`Seq[DataReaderFactory[ColumnarBatch]]`, which isn't a very clean solution.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20726: [SPARK-23574][CORE] Report SinglePartition in Dat...

2018-03-05 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/20726#discussion_r172316134
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
 ---
@@ -46,34 +48,46 @@ case class DataSourceV2ScanExec(
   new DataSourcePartitioning(
 s.outputPartitioning(), AttributeMap(output.map(a => a -> a.name)))
 
+case _ if readerFactories.size == 1 => SinglePartition
--- End diff --

Makes sense to me; also added a comment to the API describing this behavior


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20726: [SPARK-23574][CORE] Report SinglePartition in Dat...

2018-03-05 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20726#discussion_r172307625
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
 ---
@@ -46,34 +48,46 @@ case class DataSourceV2ScanExec(
   new DataSourcePartitioning(
 s.outputPartitioning(), AttributeMap(output.map(a => a -> a.name)))
 
+case _ if readerFactories.size == 1 => SinglePartition
--- End diff --

shall we move it before `case s: SupportsReportPartitioning`? the 
difference is, shall we always report `SinglePartition` if there is only one 
writer factory. By looking at `EnsureRequirements`, seems `SinglePartition` is 
more likely to satisfy the distribution.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20726: [SPARK-23574][CORE] Report SinglePartition in Dat...

2018-03-02 Thread jose-torres
GitHub user jose-torres opened a pull request:

https://github.com/apache/spark/pull/20726

[SPARK-23574][CORE] Report SinglePartition in DataSourceV2ScanExec when 
there's exactly 1 data reader factory.

## What changes were proposed in this pull request?

Report SinglePartition in DataSourceV2ScanExec when there's exactly 1 data 
reader factory.

## How was this patch tested?

existing unit tests


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jose-torres/spark SPARK-23574

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20726.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20726


commit efb839759ddc1df1eec1b14500eebe5e4ca903c5
Author: Jose Torres 
Date:   2018-03-03T04:34:05Z

SinglePartition check




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org