Github user rxin commented on a diff in the pull request:
https://github.com/apache/spark/pull/11865#discussion_r60138483
--- Diff: core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala ---
@@ -971,5 +1013,60 @@ class RDDSuite extends SparkFunSuite with
SharedSparkContext {
assertFails { sc.parallelize(1 to 100) }
assertFails { sc.textFile("/nonexistent-path") }
}
+}
+/**
+ * Coalesces partitions based on their size assuming that the parent RDD
is a [[HadoopRDD]].
+ * Took this class out of the test suite to prevent "Task not
serializable" exceptions.
+ */
+class SizeBasedCoalescer(val maxSize: Int) extends PartitionCoalescer with
Serializable {
+ override def coalesce(maxPartitions: Int, parent: RDD[_]):
Array[PartitionGroup] = {
+ val partitions: Array[Partition] = parent.asInstanceOf[HadoopRDD[Any,
Any]].getPartitions
+ val groups = ArrayBuffer[PartitionGroup]()
+ var currentGroup = PartitionGroup()
+ var currentSum = 0L
+ var totalSum = 0L
+ var index = 0
+
+ // sort partitions based on the size of the corresponding input splits
+ partitions.sortWith((partition1, partition2) => {
+ val partition1Size =
partition1.asInstanceOf[HadoopPartition].inputSplit.value.getLength
+ val partition2Size =
partition2.asInstanceOf[HadoopPartition].inputSplit.value.getLength
+ partition1Size < partition2Size
+ })
+
+ def updateGroups: Unit = {
--- End diff --
this should have parentheses since it has side effect
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]