Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16603#discussion_r96359966
  
    --- Diff: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java 
---
    @@ -144,23 +170,31 @@ public long acquireExecutionMemory(long required, 
MemoryConsumer consumer) {
           // spilling, avoid to have too many spilled files.
           if (got < required) {
             // Call spill() on other consumers to release memory
    +        // Sort the consumers according their memory usage. So we avoid 
spilling the same consumer
    +        // which is just spilled in last few times and re-spilling on it 
will produce many small
    +        // spill files.
    +        List<MemoryConsumer> sortedList = new ArrayList<>();
             for (MemoryConsumer c: consumers) {
               if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) {
    -            try {
    -              long released = c.spill(required - got, consumer);
    -              if (released > 0) {
    -                logger.debug("Task {} released {} from {} for {}", 
taskAttemptId,
    -                  Utils.bytesToString(released), c, consumer);
    -                got += memoryManager.acquireExecutionMemory(required - 
got, taskAttemptId, mode);
    -                if (got >= required) {
    -                  break;
    -                }
    +            sortedList.add(c);
    +          }
    +        }
    +        Collections.sort(sortedList, new ConsumerComparator());
    +        for (MemoryConsumer c: sortedList) {
    +          try {
    +            long released = c.spill(required - got, consumer);
    +            if (released > 0) {
    +              logger.debug("Task {} released {} from {} for {}", 
taskAttemptId,
    +                Utils.bytesToString(released), c, consumer);
    +              got += memoryManager.acquireExecutionMemory(required - got, 
taskAttemptId, mode);
    +              if (got >= required) {
    +                break;
                   }
    -            } catch (IOException e) {
    -              logger.error("error while calling spill() on " + c, e);
    -              throw new OutOfMemoryError("error while calling spill() on " 
+ c + " : "
    -                + e.getMessage());
                 }
    +          } catch (IOException e) {
    +            logger.error("error while calling spill() on " + c, e);
    +            throw new OutOfMemoryError("error while calling spill() on " + 
c + " : "
    +              + e.getMessage());
               }
    --- End diff --
    
    
    To elaborate so that there is confusion.
    A) If we are keeping existing implementation:
    
    TreeMap where comparator is enforces natural ordering we need (trivial to 
break ties when memory in two consumers is the same - example, make it a 
composite key where relevant portion is the memory and ignored portion is the 
identityHashCode: with latter used to break ties in ordering and equality - or 
something better).
    
    * Instead of insertAll to list and sorting - insert all to TreeSet - 
complexity and code remains same.
    
    * Instead of iterating over sorted list trying to find minimum subset to 
remove (current implementation in the PR is not optimal, and marginally better 
than what exists) - leverage TreeMap's ability to find appropriate Consumer if 
one exists which optimally satisfies - else use largest.
    
    Something like (illustrative only) :
    ```
    while (required > 0) {
      val requiredKey = MyKey(required, 0)
      var entry = map.floorEntry(requiredKey)
      val consumer = if (null == entry) map.lastEntry.getValue else 
entry.getValue
      // evict consumer
      ...
      required -= consumer.getUsed
    }
    
    ```
    
    Note - I dont want to make this into a dp problem to do perfect binning - a 
good enough fast solution is sufficient.
    
    --
    Some example:
    
    consumers = 50 MB, 100MB, 200 MB, 250MB, 500 MB, 1GB
    
    Required: 1.4 GB
    Evict: 1GB, 500MB
    
    Required:  300 MB
    Evict: 250 MB, 100MB
    
    Required: 400MB
    Evict: 500MB
    
    Required: 60 MB
    Evict 100 MB
    
    Required: 200 MB
    Evict 200 MB
    
    --
    
    There are ofcourse better solutions for the examples above, a first cut 
implementation which does the above should be sufficiently good enough compared 
to what currently exists.
    If you want to explore a more optimum solution, that is fine too - as long 
as time and space complexity are bounded.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to