Github user mkhaitman commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1977#discussion_r27223983
  
    --- Diff: python/pyspark/shuffle.py ---
    @@ -244,72 +258,57 @@ def _next_limit(self):
     
         def mergeValues(self, iterator):
             """ Combine the items by creator and combiner """
    -        iterator = iter(iterator)
             # speedup attribute lookup
             creator, comb = self.agg.createCombiner, self.agg.mergeValue
    -        d, c, batch = self.data, 0, self.batch
    +        c, data, pdata, hfun, batch = 0, self.data, self.pdata, 
self._partition, self.batch
    +        limit = self.memory_limit
     
             for k, v in iterator:
    +            d = pdata[hfun(k)] if pdata else data
                 d[k] = comb(d[k], v) if k in d else creator(v)
     
                 c += 1
    -            if c % batch == 0 and get_used_memory() > self.memory_limit:
    -                self._spill()
    -                self._partitioned_mergeValues(iterator, self._next_limit())
    -                break
    +            if c >= batch:
    +                if get_used_memory() >= limit:
    +                    self._spill()
    +                    limit = self._next_limit()
    +                    batch /= 2
    +                    c = 0
    +                else:
    +                    batch *= 1.5
     
         def _partition(self, key):
             """ Return the partition for key """
             return hash((key, self._seed)) % self.partitions
     
    -    def _partitioned_mergeValues(self, iterator, limit=0):
    -        """ Partition the items by key, then combine them """
    -        # speedup attribute lookup
    -        creator, comb = self.agg.createCombiner, self.agg.mergeValue
    -        c, pdata, hfun, batch = 0, self.pdata, self._partition, self.batch
    -
    -        for k, v in iterator:
    -            d = pdata[hfun(k)]
    -            d[k] = comb(d[k], v) if k in d else creator(v)
    -            if not limit:
    -                continue
    -
    -            c += 1
    -            if c % batch == 0 and get_used_memory() > limit:
    -                self._spill()
    -                limit = self._next_limit()
    +    def _object_size(self, obj):
    +        """ How much of memory for this obj, assume that all the objects
    +        consume similar bytes of memory
    +        """
    +        return 1
     
    -    def mergeCombiners(self, iterator, check=True):
    +    def mergeCombiners(self, iterator, limit=None):
             """ Merge (K,V) pair by mergeCombiner """
    -        iterator = iter(iterator)
    +        if limit is None:
    +            limit = self.memory_limit
             # speedup attribute lookup
    -        d, comb, batch = self.data, self.agg.mergeCombiners, self.batch
    -        c = 0
    -        for k, v in iterator:
    -            d[k] = comb(d[k], v) if k in d else v
    -            if not check:
    -                continue
    -
    -            c += 1
    -            if c % batch == 0 and get_used_memory() > self.memory_limit:
    -                self._spill()
    -                self._partitioned_mergeCombiners(iterator, 
self._next_limit())
    -                break
    -
    -    def _partitioned_mergeCombiners(self, iterator, limit=0):
    -        """ Partition the items by key, then merge them """
    -        comb, pdata = self.agg.mergeCombiners, self.pdata
    -        c, hfun = 0, self._partition
    +        comb, hfun, objsize = self.agg.mergeCombiners, self._partition, 
self._object_size
    +        c, data, pdata, batch = 0, self.data, self.pdata, self.batch
             for k, v in iterator:
    -            d = pdata[hfun(k)]
    +            d = pdata[hfun(k)] if pdata else data
                 d[k] = comb(d[k], v) if k in d else v
                 if not limit:
                     continue
     
    -            c += 1
    -            if c % self.batch == 0 and get_used_memory() > limit:
    -                self._spill()
    -                limit = self._next_limit()
    +            c += objsize(v)
    +            if c > batch:
    +                if get_used_memory() > limit:
    +                    self._spill()
    +                    limit = self._next_limit()
    +                    batch /= 2
    +                    c = 0
    +                else:
    +                    batch *= 1.5
     
    --- End diff --
    
    Would it make sense to do a final memory check after the for loop, in case 
we just added another 999 items (worst case) without spilling any of it to disk?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to