Github user str-janus commented on a diff in the pull request:
https://github.com/apache/spark/pull/1898#discussion_r16449470
--- Diff: python/pyspark/rdd.py ---
@@ -584,42 +587,40 @@ def sortByKey(self, ascending=True,
numPartitions=None, keyfunc=lambda x: x):
if numPartitions is None:
numPartitions = self._defaultReducePartitions()
- bounds = list()
+ if numPartitions == 1:
+ if self.getNumPartitions() > 1:
+ self = self.coalesce(1)
+
+ def sort(iterator):
+ return sorted(iterator, reverse=(not ascending),
key=lambda (k, v): keyfunc(k))
+
+ return self.mapPartitions(sort)
# first compute the boundary of each part via sampling: we want to
partition
# the key-space into bins such that the bins have roughly the same
# number of (key, value) pairs falling into them
- if numPartitions > 1:
- rddSize = self.count()
- # constant from Spark's RangePartitioner
- maxSampleSize = numPartitions * 20.0
- fraction = min(maxSampleSize / max(rddSize, 1), 1.0)
-
- samples = self.sample(False, fraction, 1).map(
- lambda (k, v): k).collect()
- samples = sorted(samples, reverse=(not ascending), key=keyfunc)
-
- # we have numPartitions many parts but one of the them has
- # an implicit boundary
- for i in range(0, numPartitions - 1):
- index = (len(samples) - 1) * (i + 1) / numPartitions
- bounds.append(samples[index])
+ rddSize = self.count()
+ maxSampleSize = numPartitions * 20.0 # constant from Spark's
RangePartitioner
+ fraction = min(maxSampleSize / max(rddSize, 1), 1.0)
+ samples = self.sample(False, fraction, 1).map(lambda (k, v):
k).collect()
+ samples = sorted(samples, reverse=(not ascending), key=keyfunc)
+
+ # we have numPartitions many parts but one of the them has
+ # an implicit boundary
+ bounds = [samples[len(samples) * (i + 1) / numPartitions]
+ for i in range(0, numPartitions - 1)]
def rangePartitionFunc(k):
- p = 0
- while p < len(bounds) and keyfunc(k) > bounds[p]:
- p += 1
+ p = bisect.bisect_left(bounds, keyfunc(k))
if ascending:
return p
else:
return numPartitions - 1 - p
def mapFunc(iterator):
- yield sorted(iterator, reverse=(not ascending), key=lambda (k,
v): keyfunc(k))
+ return sorted(iterator, reverse=(not ascending), key=lambda
(k, v): keyfunc(k))
- return (self.partitionBy(numPartitions,
partitionFunc=rangePartitionFunc)
- .mapPartitions(mapFunc, preservesPartitioning=True)
- .flatMap(lambda x: x, preservesPartitioning=True))
+ return self.partitionBy(numPartitions,
rangePartitionFunc).mapPartitions(mapFunc, True)
--- End diff --
I think there might be two unintended side effects of this change. This
code used to work in pyspark:
```
sc.parallelize([5,3,4,2,1]).map(lambda x: (x,x)).sortByKey().take(1)
```
Now it failswith the error:
```
File "<...>/spark/python/pyspark/rdd.py", line 1023, in takeUpToNumLeft
yield next(iterator)
TypeError: list object is not an iterator
```
Changing mapFunc and sort back to generators rather than regular functions
fixes that problem.
After making that change, there is a second side effect due to the removal
of flatMap where the above code returns the following unexpected result due to
the default partitioning scheme:
```
[[(1, 1), (2, 2)]]
```
Removing sortByKey, e.g.:
```
sc.parallelize([5,3,4,2,1]).map(lambda x: (x,x)).take(1)
```
returns the expected result [(5, 5)]. Restoring the call to flatMap
resolves this as well.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]