Github user mateiz commented on a diff in the pull request:
https://github.com/apache/spark/pull/1898#discussion_r16208877
--- Diff: python/pyspark/rdd.py ---
@@ -584,42 +587,40 @@ def sortByKey(self, ascending=True,
numPartitions=None, keyfunc=lambda x: x):
if numPartitions is None:
numPartitions = self._defaultReducePartitions()
- bounds = list()
+ if numPartitions == 1:
+ if self.getNumPartitions() > 1:
+ self = self.coalesce(1)
+
+ def sort(iterator):
+ return sorted(iterator, reverse=(not ascending),
key=lambda (k, v): keyfunc(k))
+
+ return self.mapPartitions(sort)
# first compute the boundary of each part via sampling: we want to
partition
# the key-space into bins such that the bins have roughly the same
# number of (key, value) pairs falling into them
- if numPartitions > 1:
- rddSize = self.count()
- # constant from Spark's RangePartitioner
- maxSampleSize = numPartitions * 20.0
- fraction = min(maxSampleSize / max(rddSize, 1), 1.0)
-
- samples = self.sample(False, fraction, 1).map(
- lambda (k, v): k).collect()
- samples = sorted(samples, reverse=(not ascending), key=keyfunc)
-
- # we have numPartitions many parts but one of the them has
- # an implicit boundary
- for i in range(0, numPartitions - 1):
- index = (len(samples) - 1) * (i + 1) / numPartitions
- bounds.append(samples[index])
+ rddSize = self.count()
+ maxSampleSize = numPartitions * 20.0 # constant from Spark's
RangePartitioner
+ fraction = min(maxSampleSize / max(rddSize, 1), 1.0)
+ samples = self.sample(False, fraction, 1).map(lambda (k, v):
k).collect()
+ samples = sorted(samples, reverse=(not ascending), key=keyfunc)
+
+ # we have numPartitions many parts but one of the them has
+ # an implicit boundary
+ bounds = [samples[len(samples) * (i + 1) / numPartitions]
+ for i in range(0, numPartitions - 1)]
def rangePartitionFunc(k):
- p = 0
- while p < len(bounds) and keyfunc(k) > bounds[p]:
- p += 1
+ p = bisect.bisect_left(bounds, keyfunc(k))
if ascending:
return p
else:
return numPartitions - 1 - p
def mapFunc(iterator):
- yield sorted(iterator, reverse=(not ascending), key=lambda (k,
v): keyfunc(k))
+ return sorted(iterator, reverse=(not ascending), key=lambda
(k, v): keyfunc(k))
- return (self.partitionBy(numPartitions,
partitionFunc=rangePartitionFunc)
- .mapPartitions(mapFunc, preservesPartitioning=True)
- .flatMap(lambda x: x, preservesPartitioning=True))
+ return self.partitionBy(numPartitions,
rangePartitionFunc).mapPartitions(mapFunc, True)
--- End diff --
Ah I guess it's due to the yield -> return above?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]