[GitHub] spark pull request #16766: [SPARK-19426][SQL] Custom coalesce for Dataset

2017-08-05 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/16766


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16766: [SPARK-19426][SQL] Custom coalesce for Dataset

2017-06-18 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/16766#discussion_r122619579
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
@@ -2603,12 +2603,27 @@ class Dataset[T] private[sql](
* current upstream partitions will be executed in parallel (per whatever
* the current partitioning is).
*
+   * A [[PartitionCoalescer]] can also be supplied allowing the behavior 
of the partitioning to be
--- End diff --

Sounds this trait is unable to be generated as is in Java. Simply wrapping 
`` `...` `` would be fine.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16766: [SPARK-19426][SQL] Custom coalesce for Dataset

2017-06-18 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/16766#discussion_r122619526
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
@@ -2603,12 +2603,27 @@ class Dataset[T] private[sql](
* current upstream partitions will be executed in parallel (per whatever
* the current partitioning is).
*
+   * A [[PartitionCoalescer]] can also be supplied allowing the behavior 
of the partitioning to be
+   * customized similar to [[RDD.coalesce]].
--- End diff --

I think it should be `[[org.apache.spark.rdd.RDD##coalesce]]`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16766: [SPARK-19426][SQL] Custom coalesce for Dataset

2017-02-03 Thread mariusvniekerk
Github user mariusvniekerk commented on a diff in the pull request:

https://github.com/apache/spark/pull/16766#discussion_r99369813
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
 ---
@@ -497,7 +496,9 @@ case class UnionExec(children: Seq[SparkPlan]) extends 
SparkPlan {
  * if you go from 1000 partitions to 100 partitions, there will not be a 
shuffle, instead each of
  * the 100 new partitions will claim 10 of the current partitions.
  */
-case class CoalesceExec(numPartitions: Int, child: SparkPlan) extends 
UnaryExecNode {
+case class CoalesceExec(numPartitions: Int, child: SparkPlan,
+partitionCoalescer: Option[PartitionCoalescer]
+   ) extends UnaryExecNode {
--- End diff --

Do you guys have a .scalafmt.conf that applies all of this?  that should 
make things cleaner.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16766: [SPARK-19426][SQL] Custom coalesce for Dataset

2017-02-03 Thread mariusvniekerk
Github user mariusvniekerk commented on a diff in the pull request:

https://github.com/apache/spark/pull/16766#discussion_r99366754
  
--- Diff: sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala 
---
@@ -117,6 +134,34 @@ class DatasetSuite extends QueryTest with 
SharedSQLContext {
   data: _*)
   }
 
+  test("coalesce, custom") {
+
+val maxSplitSize = 512
+// Similar to the implementation of `test("custom RDD coalescer")` 
from [[RDDSuite]] we first
+// write out to disk, to ensure that our splits are in fact 
[[FileSplit]] instances.
+val data = (1 to 1000).map(i => ClassData(i.toString, i))
+data.toDS().repartition(10).write.format("csv").save(path.toString)
+
+val ds = spark.read.format("csv").load(path.toString).as[ClassData]
--- End diff --

Oh right csv doesn't do headers.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16766: [SPARK-19426][SQL] Custom coalesce for Dataset

2017-02-03 Thread mariusvniekerk
Github user mariusvniekerk commented on a diff in the pull request:

https://github.com/apache/spark/pull/16766#discussion_r99366143
  
--- Diff: sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala 
---
@@ -17,24 +17,41 @@
 
 package org.apache.spark.sql
 
-import java.io.{Externalizable, ObjectInput, ObjectOutput}
+import java.io.{Externalizable, File, ObjectInput, ObjectOutput}
 import java.sql.{Date, Timestamp}
 
+import org.apache.hadoop.mapred.FileSplit
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.rdd.{CoalescedRDDPartition, HadoopPartition, 
SizeBasedCoalescer}
 import org.apache.spark.sql.catalyst.encoders.{OuterScopes, RowEncoder}
 import org.apache.spark.sql.catalyst.util.sideBySide
-import org.apache.spark.sql.execution.{LogicalRDD, RDDScanExec, SortExec}
+import org.apache.spark.sql.execution.{LogicalRDD, RDDScanExec}
 import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, 
ShuffleExchange}
 import org.apache.spark.sql.execution.streaming.MemoryStream
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
 
 case class TestDataPoint(x: Int, y: Double, s: String, t: TestDataPoint2)
 case class TestDataPoint2(x: Int, s: String)
 
-class DatasetSuite extends QueryTest with SharedSQLContext {
+class DatasetSuite extends QueryTest with SharedSQLContext with 
BeforeAndAfter {
   import testImplicits._
 
+  private var path: File = null
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+path = Utils.createTempDir()
+path.delete()
+  }
+
+  after {
+Utils.deleteRecursively(path)
+  }
--- End diff --

ah thanks.  I looked at the writer tests


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16766: [SPARK-19426][SQL] Custom coalesce for Dataset

2017-02-03 Thread mariusvniekerk
Github user mariusvniekerk commented on a diff in the pull request:

https://github.com/apache/spark/pull/16766#discussion_r99363149
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 ---
@@ -823,6 +825,17 @@ case class Repartition(numPartitions: Int, shuffle: 
Boolean, child: LogicalPlan)
 }
 
 /**
+ * Returns a new RDD that has exactly `numPartitions` partitions.
+ */
+case class CoalesceLogical(numPartitions: Int, partitionCoalescer: 
Option[PartitionCoalescer],
--- End diff --

that sounds good


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16766: [SPARK-19426][SQL] Custom coalesce for Dataset

2017-02-02 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16766#discussion_r99286995
  
--- Diff: sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala 
---
@@ -117,6 +134,34 @@ class DatasetSuite extends QueryTest with 
SharedSQLContext {
   data: _*)
   }
 
+  test("coalesce, custom") {
+
+val maxSplitSize = 512
+// Similar to the implementation of `test("custom RDD coalescer")` 
from [[RDDSuite]] we first
+// write out to disk, to ensure that our splits are in fact 
[[FileSplit]] instances.
+val data = (1 to 1000).map(i => ClassData(i.toString, i))
+data.toDS().repartition(10).write.format("csv").save(path.toString)
+
+val ds = spark.read.format("csv").load(path.toString).as[ClassData]
+val coalescedDataSet =
+  ds.coalesce(2, partitionCoalescer = Option(new 
SizeBasedCoalescer(maxSplitSize)))
+
+assert(coalescedDataSet.rdd.partitions.length <= 10)
+
+var totalPartitionCount = 0L
+coalescedDataSet.rdd.partitions.foreach(partition => {
+  var splitSizeSum = 0L
+  
partition.asInstanceOf[CoalescedRDDPartition].parents.foreach(partition => {
+val split = 
partition.asInstanceOf[HadoopPartition].inputSplit.value.asInstanceOf[FileSplit]
+splitSizeSum += split.getLength
+totalPartitionCount += 1
+  })
+  assert(splitSizeSum <= maxSplitSize)
+})
+assert(totalPartitionCount == 10)
+
--- End diff --

Nit: Remove this empty line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16766: [SPARK-19426][SQL] Custom coalesce for Dataset

2017-02-02 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16766#discussion_r99286957
  
--- Diff: sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala 
---
@@ -17,24 +17,41 @@
 
 package org.apache.spark.sql
 
-import java.io.{Externalizable, ObjectInput, ObjectOutput}
+import java.io.{Externalizable, File, ObjectInput, ObjectOutput}
 import java.sql.{Date, Timestamp}
 
+import org.apache.hadoop.mapred.FileSplit
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.rdd.{CoalescedRDDPartition, HadoopPartition, 
SizeBasedCoalescer}
 import org.apache.spark.sql.catalyst.encoders.{OuterScopes, RowEncoder}
 import org.apache.spark.sql.catalyst.util.sideBySide
-import org.apache.spark.sql.execution.{LogicalRDD, RDDScanExec, SortExec}
+import org.apache.spark.sql.execution.{LogicalRDD, RDDScanExec}
 import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, 
ShuffleExchange}
 import org.apache.spark.sql.execution.streaming.MemoryStream
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
 
 case class TestDataPoint(x: Int, y: Double, s: String, t: TestDataPoint2)
 case class TestDataPoint2(x: Int, s: String)
 
-class DatasetSuite extends QueryTest with SharedSQLContext {
+class DatasetSuite extends QueryTest with SharedSQLContext with 
BeforeAndAfter {
   import testImplicits._
 
+  private var path: File = null
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+path = Utils.createTempDir()
+path.delete()
+  }
+
+  after {
+Utils.deleteRecursively(path)
+  }
--- End diff --

No need to do it, if you use `withTempPath`. 
[This](https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala#L247-L265)
 is an example


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16766: [SPARK-19426][SQL] Custom coalesce for Dataset

2017-02-02 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16766#discussion_r99286805
  
--- Diff: sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala 
---
@@ -117,6 +134,34 @@ class DatasetSuite extends QueryTest with 
SharedSQLContext {
   data: _*)
   }
 
+  test("coalesce, custom") {
+
+val maxSplitSize = 512
+// Similar to the implementation of `test("custom RDD coalescer")` 
from [[RDDSuite]] we first
+// write out to disk, to ensure that our splits are in fact 
[[FileSplit]] instances.
+val data = (1 to 1000).map(i => ClassData(i.toString, i))
+data.toDS().repartition(10).write.format("csv").save(path.toString)
--- End diff --

use `WithPath` to generate the path?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16766: [SPARK-19426][SQL] Custom coalesce for Dataset

2017-02-02 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16766#discussion_r99286475
  
--- Diff: sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala 
---
@@ -117,6 +134,34 @@ class DatasetSuite extends QueryTest with 
SharedSQLContext {
   data: _*)
   }
 
+  test("coalesce, custom") {
+
+val maxSplitSize = 512
+// Similar to the implementation of `test("custom RDD coalescer")` 
from [[RDDSuite]] we first
+// write out to disk, to ensure that our splits are in fact 
[[FileSplit]] instances.
+val data = (1 to 1000).map(i => ClassData(i.toString, i))
+data.toDS().repartition(10).write.format("csv").save(path.toString)
+
+val ds = spark.read.format("csv").load(path.toString).as[ClassData]
--- End diff --

```
cannot resolve '`a`' given input columns: [_c0, _c1];
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16766: [SPARK-19426][SQL] Custom coalesce for Dataset

2017-02-02 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16766#discussion_r99286218
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
 ---
@@ -497,7 +496,9 @@ case class UnionExec(children: Seq[SparkPlan]) extends 
SparkPlan {
  * if you go from 1000 partitions to 100 partitions, there will not be a 
shuffle, instead each of
  * the 100 new partitions will claim 10 of the current partitions.
  */
-case class CoalesceExec(numPartitions: Int, child: SparkPlan) extends 
UnaryExecNode {
+case class CoalesceExec(numPartitions: Int, child: SparkPlan,
+partitionCoalescer: Option[PartitionCoalescer]
+   ) extends UnaryExecNode {
--- End diff --

```
case class CoalesceExec(
numPartitions: Int,
child: SparkPlan,
partitionCoalescer: Option[PartitionCoalescer]) extends UnaryExecNode {
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16766: [SPARK-19426][SQL] Custom coalesce for Dataset

2017-02-02 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16766#discussion_r99286066
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
 ---
@@ -19,9 +19,8 @@ package org.apache.spark.sql.execution
 
 import scala.concurrent.{ExecutionContext, Future}
 import scala.concurrent.duration.Duration
-
--- End diff --

Add it back?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16766: [SPARK-19426][SQL] Custom coalesce for Dataset

2017-02-02 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16766#discussion_r99285902
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 ---
@@ -823,6 +825,17 @@ case class Repartition(numPartitions: Int, shuffle: 
Boolean, child: LogicalPlan)
 }
 
 /**
+ * Returns a new RDD that has exactly `numPartitions` partitions.
--- End diff --

This description is not right. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16766: [SPARK-19426][SQL] Custom coalesce for Dataset

2017-02-02 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16766#discussion_r99285925
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 ---
@@ -823,6 +825,17 @@ case class Repartition(numPartitions: Int, shuffle: 
Boolean, child: LogicalPlan)
 }
 
 /**
+ * Returns a new RDD that has exactly `numPartitions` partitions.
+ */
+case class CoalesceLogical(numPartitions: Int, partitionCoalescer: 
Option[PartitionCoalescer],
+child: LogicalPlan)
+  extends UnaryNode {
--- End diff --

```Scala
case class PartitionCoalesce(
numPartitions: Int,
partitionCoalescer: Option[PartitionCoalescer],
child: LogicalPlan) extends UnaryNode {
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16766: [SPARK-19426][SQL] Custom coalesce for Dataset

2017-02-02 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16766#discussion_r99285876
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 ---
@@ -823,6 +825,17 @@ case class Repartition(numPartitions: Int, shuffle: 
Boolean, child: LogicalPlan)
 }
 
 /**
+ * Returns a new RDD that has exactly `numPartitions` partitions.
+ */
+case class CoalesceLogical(numPartitions: Int, partitionCoalescer: 
Option[PartitionCoalescer],
--- End diff --

The name still looks inconsistent with the others. How about 
`PartitionCoalesce`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16766: [SPARK-19426][SQL] Custom coalesce for Dataset

2017-02-02 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16766#discussion_r99285447
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 ---
@@ -17,7 +17,9 @@
 
 package org.apache.spark.sql.catalyst.plans.logical
 
+import org.apache.spark.rdd.PartitionCoalescer
 import org.apache.spark.sql.catalyst.{CatalystConf, TableIdentifier}
+import scala.collection.mutable.ArrayBuffer
--- End diff --

Useless?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16766: [SPARK-19426][SQL] Custom coalesce for Dataset

2017-02-02 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16766#discussion_r99284849
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
 ---
@@ -497,7 +496,9 @@ case class UnionExec(children: Seq[SparkPlan]) extends 
SparkPlan {
  * if you go from 1000 partitions to 100 partitions, there will not be a 
shuffle, instead each of
  * the 100 new partitions will claim 10 of the current partitions.
  */
-case class CoalesceExec(numPartitions: Int, child: SparkPlan) extends 
UnaryExecNode {
+case class CoalesceExec(numPartitions: Int, child: SparkPlan,
+partitionCoalescer: Option[PartitionCoalescer]
+   ) extends UnaryExecNode {
--- End diff --

The same indent issue here. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16766: [SPARK-19426][SQL] Custom coalesce for Dataset

2017-02-02 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16766#discussion_r99284809
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
@@ -2437,9 +2435,12 @@ class Dataset[T] private[sql](
* @group typedrel
* @since 1.6.0
*/
-  def coalesce(numPartitions: Int): Dataset[T] = withTypedPlan {
-Repartition(numPartitions, shuffle = false, logicalPlan)
-  }
+  def coalesce(numPartitions: Int, partitionCoalescer: 
Option[PartitionCoalescer]): Dataset[T] =
+withTypedPlan {
+  CoalesceLogical(numPartitions, partitionCoalescer, logicalPlan)
+}
+
+  def coalesce(numPartitions: Int): Dataset[T] = coalesce(numPartitions, 
None)
--- End diff --

Please also add the function description, like what we did in the other 
functions in Dataset.scala?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16766: [SPARK-19426][SQL] Custom coalesce for Dataset

2017-02-02 Thread mariusvniekerk
Github user mariusvniekerk commented on a diff in the pull request:

https://github.com/apache/spark/pull/16766#discussion_r99132600
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 ---
@@ -823,6 +825,17 @@ case class Repartition(numPartitions: Int, shuffle: 
Boolean, child: LogicalPlan)
 }
 
 /**
+  * Returns a new RDD that has exactly `numPartitions` partitions.
+  */
+case class CoalesceLogical(numPartitions: Int, partitionCoalescer: 
Option[PartitionCoalescer],
--- End diff --

Main reason is there was already a Coalesce expression class


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16766: [SPARK-19426][SQL] Custom coalesce for Dataset

2017-02-01 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16766#discussion_r99065789
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 ---
@@ -823,6 +825,17 @@ case class Repartition(numPartitions: Int, shuffle: 
Boolean, child: LogicalPlan)
 }
 
 /**
+  * Returns a new RDD that has exactly `numPartitions` partitions.
+  */
+case class CoalesceLogical(numPartitions: Int, partitionCoalescer: 
Option[PartitionCoalescer],
--- End diff --

`CoalesceLogical ` -> `Coalesce`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16766: [SPARK-19426][SQL] Custom coalesce for Dataset

2017-02-01 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16766#discussion_r99064595
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 ---
@@ -823,6 +825,17 @@ case class Repartition(numPartitions: Int, shuffle: 
Boolean, child: LogicalPlan)
 }
 
 /**
+  * Returns a new RDD that has exactly `numPartitions` partitions.
+  */
+case class CoalesceLogical(numPartitions: Int, partitionCoalescer: 
Option[PartitionCoalescer],
+child: LogicalPlan)
--- End diff --

Could you follow the styles documented in 
https://github.com/databricks/scala-style-guide?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16766: [SPARK-19426][SQL] Custom coalesce for Dataset

2017-02-01 Thread mariusvniekerk
GitHub user mariusvniekerk opened a pull request:

https://github.com/apache/spark/pull/16766

[SPARK-19426][SQL] Custom coalesce for Dataset

## What changes were proposed in this pull request?

This adds support for using the PartitionCoalescer features added in #11865 
(SPARK-14042) to the Dataset API

## How was this patch tested?

Manual tests



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mariusvniekerk/spark wip_customCoalesce

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/16766.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #16766


commit 15b34dd88f81b20c1be1ef42e6b647d42ef5f462
Author: Marius van Niekerk 
Date:   2016-11-07T22:06:38Z

custom coalesce




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org