nolanliou opened a new pull request #32667:
URL: https://github.com/apache/spark/pull/32667


   ### What changes were proposed in this pull request?
   Limit the batch size for `add_shuffle_key` in `partitionBy` function to fix 
`OverflowError: cannot convert float infinity to integer`
   
   
   ### Why are the changes needed?
   It's not easy to write a UT, but I can use some simple code to explain the 
bug.
   * Original code
   ```
           def add_shuffle_key(split, iterator):
   
               buckets = defaultdict(list)
               c, batch = 0, min(10 * numPartitions, 1000)
   
               for k, v in iterator:
                   buckets[partitionFunc(k) % numPartitions].append((k, v))
                   c += 1
   
                   # check used memory and avg size of chunk of objects
                   if (c % 1000 == 0 and get_used_memory() > limit
                           or c > batch):
                       n, size = len(buckets), 0
                       for split in list(buckets.keys()):
                           yield pack_long(split)
                           d = outputSerializer.dumps(buckets[split])
                           del buckets[split]
                           yield d
                           size += len(d)
   
                       avg = int(size / n) >> 20
                       # let 1M < avg < 10M
                       if avg < 1:
                           batch *= 1.5
                       elif avg > 10:
                           batch = max(int(batch / 1.5), 1)
                       c = 0
   ```
   if `get_used_memory() > limit` always `True` and `avg < 1` always `True`, 
the variable `batch` will grow to infinity. then `batch = max(int(batch / 1.5), 
1)` may raise `OverflowError` if `avg > 10` at some time.
   * sample code to reproduce the bug
   ```
   import sys
   
   limit = 100
   used_memory = 200
   numPartitions = 64
   c, batch = 0, min(10 * numPartitions, 1000)
   
   
   while True:
       c += 1
       if (c % 1000 == 0 and used_memory > limit or c > batch):
           batch = batch * 1.5
           d = max(int(batch / 1.5), 1)
           print(c, batch)
   ```
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   no
   
   
   ### How was this patch tested?
   It's not easy to write a UT, there is sample code to test
   ```
   import sys
   
   
   limit = 100
   used_memory = 200
   numPartitions = 64
   c, batch = 0, min(10 * numPartitions, 1000)
   
   
   while True:
       c += 1
       if (c % 1000 == 0 and used_memory > limit or c > batch):
           batch = min(sys.maxsize, batch * 1.5)
           d = max(int(batch / 1.5), 1)
           print(c, batch)
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to