Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10277#discussion_r47566839
  
    --- Diff: 
network/shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexCache.java
 ---
    @@ -0,0 +1,251 @@
    +package org.apache.spark.network.shuffle;
    +
    +import java.io.DataInputStream;
    +import java.io.File;
    +import java.io.FileInputStream;
    +import java.io.IOException;
    +import java.nio.ByteBuffer;
    +import java.nio.LongBuffer;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.ConcurrentMap;
    +import java.util.concurrent.LinkedBlockingQueue;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import com.google.common.annotations.VisibleForTesting;
    +import com.google.common.base.Objects;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
    +import org.apache.spark.network.util.JavaUtils;
    +
    +/**
    + * Store the offsets of the data blocks in cache.
    + * When index cache is not enough, remove firstly used index information.
    + */
    +public class ShuffleIndexCache {
    +  private static final Logger logger = 
LoggerFactory.getLogger(ShuffleIndexCache.class);
    +
    +  private final ConcurrentMap<ShuffleMapId, IndexInformation> indexCache;
    +  private final LinkedBlockingQueue<ShuffleMapId> queue = new 
LinkedBlockingQueue<ShuffleMapId>();
    +  private final int totalMemoryAllowed;
    +  private AtomicInteger totalMemoryUsed = new AtomicInteger();
    +
    +  public ShuffleIndexCache(int totalMemoryAllowed) {
    +    this.indexCache = new ConcurrentHashMap<ShuffleMapId, 
IndexInformation>();
    +    this.totalMemoryAllowed = totalMemoryAllowed;
    +    logger.info("IndexCache created with max memory = {}", 
totalMemoryAllowed);
    +  }
    +
    +  /**
    +   * Get the index information for the given shuffleId, mapId and reduceId.
    +   * It reads the index file into cache if it is not already present.
    +   */
    +  public ShuffleIndexRecord getIndexInformation(
    +    ExecutorShuffleInfo executor, int shuffleId, int mapId, int reduceId) 
throws IOException {
    +    if (totalMemoryAllowed > 0) {
    +      ShuffleMapId shuffleMapId = new ShuffleMapId(shuffleId, mapId);
    +      IndexInformation info = indexCache.get(shuffleMapId);
    +
    +      if (info == null) {
    +        info = readIndexFileToCache(executor, shuffleMapId);
    +      } else {
    +        synchronized(info) {
    +          while (isUnderConstruction(info)) {
    +            try {
    +              info.wait();
    +            } catch (InterruptedException e) {
    +              throw new IOException("Interrupted waiting for 
construction", e);
    +            }
    +          }
    +        }
    +      }
    +
    +      if(info.getLength() == 0 || info.getLength() <= reduceId + 1) {
    +        throw new IOException("Invalid request " + " shuffleMapId = " + 
shuffleMapId +
    +          " reduceId = " + reduceId + " Index Info Length = " + 
info.getLength() +
    +            " index file = " + getIndexFile(executor, mapId, reduceId));
    +      }
    +
    +      return info.getIndex(reduceId);
    +    } else {
    +      return this.readIndexFile(executor, shuffleId, mapId, reduceId);
    +    }
    +  }
    +
    +  public ShuffleIndexRecord readIndexFile(
    +    ExecutorShuffleInfo executor, int shuffleId, int mapId, int reduceId) 
throws IOException {
    --- End diff --
    
    style:
    ```
    public ShuffleIndexRecord readIndexFile(
        ExecutorShuffleInfo executor,
        int shuffleId,
        int mapId,
        int reduceId) throws IOException {
      ...
    }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to