Github user mkhaitman commented on a diff in the pull request:
https://github.com/apache/spark/pull/1977#discussion_r27223759
--- Diff: python/pyspark/shuffle.py ---
@@ -244,72 +258,57 @@ def _next_limit(self):
def mergeValues(self, iterator):
""" Combine the items by creator and combiner """
- iterator = iter(iterator)
# speedup attribute lookup
creator, comb = self.agg.createCombiner, self.agg.mergeValue
- d, c, batch = self.data, 0, self.batch
+ c, data, pdata, hfun, batch = 0, self.data, self.pdata,
self._partition, self.batch
+ limit = self.memory_limit
for k, v in iterator:
+ d = pdata[hfun(k)] if pdata else data
d[k] = comb(d[k], v) if k in d else creator(v)
c += 1
- if c % batch == 0 and get_used_memory() > self.memory_limit:
- self._spill()
- self._partitioned_mergeValues(iterator, self._next_limit())
- break
+ if c >= batch:
+ if get_used_memory() >= limit:
+ self._spill()
+ limit = self._next_limit()
+ batch /= 2
+ c = 0
+ else:
+ batch *= 1.5
--- End diff --
Would it make sense to do a final memory check after the for loop, in case
we just added another 999 items (worst case) without spilling any of it to
disk?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]