Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10705#discussion_r53744323
  
    --- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala ---
    @@ -0,0 +1,356 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.storage
    +
    +import java.lang
    +import javax.annotation.concurrent.GuardedBy
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable
    +
    +import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
    +import com.google.common.collect.ConcurrentHashMultiset
    +
    +import org.apache.spark.{Logging, TaskContext}
    +
    +
    +/**
    + * Tracks metadata for an individual block.
    + *
    + * @param level the block's storage level. This is the requested 
persistence level, not the
    + *              effective storage level of the block (i.e. if this is 
MEMORY_AND_DISK, then this
    + *              does not imply that the block is actually resident in 
memory).
    + * @param tellMaster whether state changes for this block should be 
reported to the master. This
    + *                   is true for most blocks, but is false for broadcast 
blocks.
    + */
    +private[storage] class BlockInfo(val level: StorageLevel, val tellMaster: 
Boolean) {
    +
    +  /**
    +   * The size of the block (in bytes)
    +   */
    +  var size: Long = 0
    +
    +  /**
    +   * The number of times that this block has been locked for reading.
    +   */
    +  var readerCount: Int = 0
    +
    +  /**
    +   * The task attempt id of the task which currently holds the write lock 
for this block, or -1
    +   * if this block is not locked for writing.
    +   */
    +  var writerTask: Long = -1
    +
    +  // Invariants:
    +  //     (writerTask != -1) implies (readerCount == 0)
    +  //     (readerCount != 0) implies (writerTask == -1)
    +  // TODO: add assertions around every method
    +
    +  /**
    +   * True if this block has been removed from the BlockManager and false 
otherwise.
    +   * This field is used to communicate block deletion to blocked readers / 
writers (see its usage
    +   * in [[BlockInfoManager]]).
    +   */
    +  var removed: Boolean = false
    +
    +  // TODO: Add timestamps on lock acquisitions
    +}
    +// In debugging mode, check that locks haven't been held for too long.
    +// Every few minutes, dump debug info.
    +
    +/**
    + * Component of the [[BlockManager]] which tracks metadata for blocks and 
manages block locking.
    + *
    + * The locking interface exposed by this class is readers-writers lock. 
Every lock acquisition is
    + * automatically associated with a running task and locks are 
automatically released upon task
    + * completion or failure.
    + *
    + * This class is thread-safe.
    + */
    +private[storage] class BlockInfoManager extends Logging {
    +
    +  private type TaskAttemptId = Long
    +
    +  /**
    +   * Used to look up metadata for individual blocks. Entries are added to 
this map via an atomic
    +   * set-if-not-exists operation ([[lockNewBlockForWriting()]]) and are 
removed
    +   * by [[removeBlock()]].
    +   */
    +  @GuardedBy("this")
    +  private[this] val infos = new mutable.HashMap[BlockId, BlockInfo]
    +
    +  /**
    +   * Tracks the set of blocks that each task has locked for writing.
    +   */
    +  @GuardedBy("this")
    +  private[this] val writeLocksByTask =
    +    new mutable.HashMap[TaskAttemptId, mutable.Set[BlockId]]
    +      with mutable.MultiMap[TaskAttemptId, BlockId]
    +
    +  /**
    +   * Tracks the set of blocks that each task has locked for reading, along 
with the number of times
    +   * that a block has been locked (since our read locks are re-entrant). 
This is thread-safe.
    +   */
    +  private[this] val readLocksByTask: LoadingCache[lang.Long, 
ConcurrentHashMultiset[BlockId]] = {
    +    // We need to explicitly box as java.lang.Long to avoid a type 
mismatch error:
    +    val loader = new CacheLoader[java.lang.Long, 
ConcurrentHashMultiset[BlockId]] {
    +      override def load(t: java.lang.Long) = 
ConcurrentHashMultiset.create[BlockId]()
    +    }
    +    CacheBuilder.newBuilder().build(loader)
    +  }
    +
    +  // 
----------------------------------------------------------------------------------------------
    +
    +  /**
    +   * Returns the current tasks's task attempt id (which uniquely 
identifies the task), or -1024
    +   * if called outside of a task (-1024 was chosen because it's different 
than the -1 which is used
    +   * in [[BlockInfo.writerTask]] to denote the absence of a write lock).
    +   */
    +  private def currentTaskAttemptId: TaskAttemptId = {
    +    // TODO(josh): assert that this only happens on the driver?
    +    // What about block transfer / getRemote()?
    +    Option(TaskContext.get()).map(_.taskAttemptId()).getOrElse(-1024L)
    +  }
    +
    +  /**
    +   * Lock a block for reading and return its metadata.
    +   *
    +   * A single task can lock a block multiple times for reading, in which 
case each lock will need
    +   * to be released separately.
    --- End diff --
    
    Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to