[SPARK-14654][CORE] New accumulator API

## What changes were proposed in this pull request?

This PR introduces a new accumulator API  which is much simpler than before:

1. the type hierarchy is simplified, now we only have an `Accumulator` class
2. Combine `initialValue` and `zeroValue` concepts into just one concept: 
`zeroValue`
3. there in only one `register` method, the accumulator registration and 
cleanup registration are combined.
4. the `id`,`name` and `countFailedValues` are combined into an 
`AccumulatorMetadata`, and is provided during registration.

`SQLMetric` is a good example to show the simplicity of this new API.

What we break:

1. no `setValue` anymore. In the new API, the intermedia type can be different 
from the result type, it's very hard to implement a general `setValue`
2. accumulator can't be serialized before registered.

Problems need to be addressed in follow-ups:

1. with this new API, `AccumulatorInfo` doesn't make a lot of sense, the 
partial output is not partial updates, we need to expose the intermediate value.
2. `ExceptionFailure` should not carry the accumulator updates. Why do users 
care about accumulator updates for failed cases? It looks like we only use this 
feature to update the internal metrics, how about we sending a heartbeat to 
update internal metrics after the failure event?
3. the public event `SparkListenerTaskEnd` carries a `TaskMetrics`. Ideally 
this `TaskMetrics` don't need to carry external accumulators, as the only 
method of `TaskMetrics` that can access external accumulators is 
`private[spark]`. However, `SQLListener` use it to retrieve sql metrics.

## How was this patch tested?

existing tests

Author: Wenchen Fan <wenc...@databricks.com>

Closes #12612 from cloud-fan/acc.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bf5496db
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bf5496db
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bf5496db

Branch: refs/heads/master
Commit: bf5496dbdac75ea69081c95a92a29771e635ea98
Parents: be317d4
Author: Wenchen Fan <wenc...@databricks.com>
Authored: Thu Apr 28 00:26:39 2016 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Thu Apr 28 00:26:39 2016 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/Accumulable.scala    |  64 +--
 .../scala/org/apache/spark/Accumulator.scala    |  67 ----
 .../scala/org/apache/spark/ContextCleaner.scala |   4 +-
 .../org/apache/spark/HeartbeatReceiver.scala    |   2 +-
 .../scala/org/apache/spark/NewAccumulator.scala | 391 +++++++++++++++++++
 .../scala/org/apache/spark/SparkContext.scala   | 107 ++++-
 .../scala/org/apache/spark/TaskContext.scala    |   2 +-
 .../org/apache/spark/TaskContextImpl.scala      |   2 +-
 .../scala/org/apache/spark/TaskEndReason.scala  |  20 +-
 .../org/apache/spark/executor/Executor.scala    |  14 +-
 .../apache/spark/executor/InputMetrics.scala    |  12 +-
 .../apache/spark/executor/OutputMetrics.scala   |  12 +-
 .../spark/executor/ShuffleReadMetrics.scala     |  64 ++-
 .../spark/executor/ShuffleWriteMetrics.scala    |  19 +-
 .../org/apache/spark/executor/TaskMetrics.scala | 228 ++++++-----
 .../apache/spark/scheduler/DAGScheduler.scala   |  20 +-
 .../spark/scheduler/DAGSchedulerEvent.scala     |   2 +-
 .../apache/spark/scheduler/SparkListener.scala  |   3 -
 .../org/apache/spark/scheduler/Stage.scala      |   2 +-
 .../scala/org/apache/spark/scheduler/Task.scala |  10 +-
 .../org/apache/spark/scheduler/TaskResult.scala |   8 +-
 .../spark/scheduler/TaskResultGetter.scala      |   7 +-
 .../apache/spark/scheduler/TaskScheduler.scala  |   3 +-
 .../spark/scheduler/TaskSchedulerImpl.scala     |   5 +-
 .../apache/spark/scheduler/TaskSetManager.scala |   6 +-
 .../serializer/SerializationDebugger.scala      |   8 +-
 .../org/apache/spark/status/api/v1/api.scala    |   4 +-
 .../org/apache/spark/storage/BlockManager.scala |   8 +-
 .../spark/ui/jobs/JobProgressListener.scala     |  17 +-
 .../org/apache/spark/util/JsonProtocol.scala    |   4 +-
 .../org/apache/spark/AccumulatorSuite.scala     | 132 +++----
 .../apache/spark/HeartbeatReceiverSuite.scala   |   6 +-
 .../apache/spark/InternalAccumulatorSuite.scala |  24 +-
 .../scala/org/apache/spark/SparkFunSuite.scala  |   2 +-
 .../spark/executor/TaskMetricsSuite.scala       |  85 ++--
 .../spark/scheduler/DAGSchedulerSuite.scala     |  71 ++--
 .../scheduler/ExternalClusterManagerSuite.scala |   7 +-
 .../spark/scheduler/TaskContextSuite.scala      |  16 +-
 .../spark/scheduler/TaskResultGetterSuite.scala |   4 +-
 .../spark/scheduler/TaskSetManagerSuite.scala   |  11 +-
 .../ui/jobs/JobProgressListenerSuite.scala      |  12 +-
 .../apache/spark/util/JsonProtocolSuite.scala   |   7 +-
 project/MimaExcludes.scala                      |  12 +
 .../spark/sql/execution/ExistingRDD.scala       |   6 +-
 .../apache/spark/sql/execution/ExpandExec.scala |   2 +-
 .../spark/sql/execution/GenerateExec.scala      |   2 +-
 .../sql/execution/LocalTableScanExec.scala      |   2 +-
 .../apache/spark/sql/execution/SparkPlan.scala  |   7 +-
 .../spark/sql/execution/SparkPlanInfo.scala     |   3 +-
 .../sql/execution/UnsafeRowSerializer.scala     |  13 +-
 .../sql/execution/WholeStageCodegenExec.scala   |   8 +-
 .../aggregate/SortBasedAggregateExec.scala      |   2 +-
 .../SortBasedAggregationIterator.scala          |   4 +-
 .../execution/aggregate/TungstenAggregate.scala |   8 +-
 .../aggregate/TungstenAggregationIterator.scala |   8 +-
 .../sql/execution/basicPhysicalOperators.scala  |   6 +-
 .../columnar/InMemoryTableScanExec.scala        |   6 +-
 .../exchange/BroadcastExchangeExec.scala        |   8 +-
 .../execution/joins/BroadcastHashJoinExec.scala |   2 +-
 .../joins/BroadcastNestedLoopJoinExec.scala     |   2 +-
 .../execution/joins/CartesianProductExec.scala  |   2 +-
 .../spark/sql/execution/joins/HashJoin.scala    |   4 +-
 .../execution/joins/ShuffledHashJoinExec.scala  |   2 +-
 .../sql/execution/joins/SortMergeJoinExec.scala |  12 +-
 .../sql/execution/metric/SQLMetricInfo.scala    |   2 +-
 .../spark/sql/execution/metric/SQLMetrics.scala | 218 +++--------
 .../spark/sql/execution/ui/SQLListener.scala    |  16 +-
 .../spark/sql/execution/ui/SparkPlanGraph.scala |   6 +-
 .../org/apache/spark/sql/CachedTableSuite.scala |   8 +-
 .../sql/execution/metric/SQLMetricsSuite.scala  |  10 +-
 .../sql/execution/ui/SQLListenerSuite.scala     |  32 +-
 .../spark/sql/util/DataFrameCallbackSuite.scala |   6 +-
 .../sql/hive/execution/HiveTableScanExec.scala  |   2 +-
 73 files changed, 1071 insertions(+), 842 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/core/src/main/scala/org/apache/spark/Accumulable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Accumulable.scala 
b/core/src/main/scala/org/apache/spark/Accumulable.scala
index e8f053c..c76720c 100644
--- a/core/src/main/scala/org/apache/spark/Accumulable.scala
+++ b/core/src/main/scala/org/apache/spark/Accumulable.scala
@@ -63,7 +63,7 @@ class Accumulable[R, T] private (
       param: AccumulableParam[R, T],
       name: Option[String],
       countFailedValues: Boolean) = {
-    this(Accumulators.newId(), initialValue, param, name, countFailedValues)
+    this(AccumulatorContext.newId(), initialValue, param, name, 
countFailedValues)
   }
 
   private[spark] def this(initialValue: R, param: AccumulableParam[R, T], 
name: Option[String]) = {
@@ -72,34 +72,23 @@ class Accumulable[R, T] private (
 
   def this(initialValue: R, param: AccumulableParam[R, T]) = 
this(initialValue, param, None)
 
-  @volatile @transient private var value_ : R = initialValue // Current value 
on driver
-  val zero = param.zero(initialValue) // Zero value to be passed to executors
-  private var deserialized = false
-
-  Accumulators.register(this)
-
-  /**
-   * Return a copy of this [[Accumulable]].
-   *
-   * The copy will have the same ID as the original and will not be registered 
with
-   * [[Accumulators]] again. This method exists so that the caller can avoid 
passing the
-   * same mutable instance around.
-   */
-  private[spark] def copy(): Accumulable[R, T] = {
-    new Accumulable[R, T](id, initialValue, param, name, countFailedValues)
-  }
+  val zero = param.zero(initialValue)
+  private[spark] val newAcc = new LegacyAccumulatorWrapper(initialValue, param)
+  newAcc.metadata = AccumulatorMetadata(id, name, countFailedValues)
+  // Register the new accumulator in ctor, to follow the previous behaviour.
+  AccumulatorContext.register(newAcc)
 
   /**
    * Add more data to this accumulator / accumulable
    * @param term the data to add
    */
-  def += (term: T) { value_ = param.addAccumulator(value_, term) }
+  def += (term: T) { newAcc.add(term) }
 
   /**
    * Add more data to this accumulator / accumulable
    * @param term the data to add
    */
-  def add(term: T) { value_ = param.addAccumulator(value_, term) }
+  def add(term: T) { newAcc.add(term) }
 
   /**
    * Merge two accumulable objects together
@@ -107,7 +96,7 @@ class Accumulable[R, T] private (
    * Normally, a user will not want to use this version, but will instead call 
`+=`.
    * @param term the other `R` that will get merged with this
    */
-  def ++= (term: R) { value_ = param.addInPlace(value_, term)}
+  def ++= (term: R) { newAcc._value = param.addInPlace(newAcc._value, term) }
 
   /**
    * Merge two accumulable objects together
@@ -115,18 +104,12 @@ class Accumulable[R, T] private (
    * Normally, a user will not want to use this version, but will instead call 
`add`.
    * @param term the other `R` that will get merged with this
    */
-  def merge(term: R) { value_ = param.addInPlace(value_, term)}
+  def merge(term: R) { newAcc._value = param.addInPlace(newAcc._value, term) }
 
   /**
    * Access the accumulator's current value; only allowed on driver.
    */
-  def value: R = {
-    if (!deserialized) {
-      value_
-    } else {
-      throw new UnsupportedOperationException("Can't read accumulator value in 
task")
-    }
-  }
+  def value: R = newAcc.value
 
   /**
    * Get the current value of this accumulator from within a task.
@@ -137,14 +120,14 @@ class Accumulable[R, T] private (
    * The typical use of this method is to directly mutate the local value, 
eg., to add
    * an element to a Set.
    */
-  def localValue: R = value_
+  def localValue: R = newAcc.localValue
 
   /**
    * Set the accumulator's value; only allowed on driver.
    */
   def value_= (newValue: R) {
-    if (!deserialized) {
-      value_ = newValue
+    if (newAcc.isAtDriverSide) {
+      newAcc._value = newValue
     } else {
       throw new UnsupportedOperationException("Can't assign accumulator value 
in task")
     }
@@ -153,7 +136,7 @@ class Accumulable[R, T] private (
   /**
    * Set the accumulator's value. For internal use only.
    */
-  def setValue(newValue: R): Unit = { value_ = newValue }
+  def setValue(newValue: R): Unit = { newAcc._value = newValue }
 
   /**
    * Set the accumulator's value. For internal use only.
@@ -168,22 +151,7 @@ class Accumulable[R, T] private (
     new AccumulableInfo(id, name, update, value, isInternal, countFailedValues)
   }
 
-  // Called by Java when deserializing an object
-  private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException 
{
-    in.defaultReadObject()
-    value_ = zero
-    deserialized = true
-
-    // Automatically register the accumulator when it is deserialized with the 
task closure.
-    // This is for external accumulators and internal ones that do not 
represent task level
-    // metrics, e.g. internal SQL metrics, which are per-operator.
-    val taskContext = TaskContext.get()
-    if (taskContext != null) {
-      taskContext.registerAccumulator(this)
-    }
-  }
-
-  override def toString: String = if (value_ == null) "null" else 
value_.toString
+  override def toString: String = if (newAcc._value == null) "null" else 
newAcc._value.toString
 }
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/core/src/main/scala/org/apache/spark/Accumulator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Accumulator.scala 
b/core/src/main/scala/org/apache/spark/Accumulator.scala
index 0c17f01..9b007b9 100644
--- a/core/src/main/scala/org/apache/spark/Accumulator.scala
+++ b/core/src/main/scala/org/apache/spark/Accumulator.scala
@@ -68,73 +68,6 @@ class Accumulator[T] private[spark] (
   extends Accumulable[T, T](initialValue, param, name, countFailedValues)
 
 
-// TODO: The multi-thread support in accumulators is kind of lame; check
-// if there's a more intuitive way of doing it right
-private[spark] object Accumulators extends Logging {
-  /**
-   * This global map holds the original accumulator objects that are created 
on the driver.
-   * It keeps weak references to these objects so that accumulators can be 
garbage-collected
-   * once the RDDs and user-code that reference them are cleaned up.
-   * TODO: Don't use a global map; these should be tied to a SparkContext 
(SPARK-13051).
-   */
-  @GuardedBy("Accumulators")
-  val originals = mutable.Map[Long, WeakReference[Accumulable[_, _]]]()
-
-  private val nextId = new AtomicLong(0L)
-
-  /**
-   * Return a globally unique ID for a new [[Accumulable]].
-   * Note: Once you copy the [[Accumulable]] the ID is no longer unique.
-   */
-  def newId(): Long = nextId.getAndIncrement
-
-  /**
-   * Register an [[Accumulable]] created on the driver such that it can be 
used on the executors.
-   *
-   * All accumulators registered here can later be used as a container for 
accumulating partial
-   * values across multiple tasks. This is what 
[[org.apache.spark.scheduler.DAGScheduler]] does.
-   * Note: if an accumulator is registered here, it should also be registered 
with the active
-   * context cleaner for cleanup so as to avoid memory leaks.
-   *
-   * If an [[Accumulable]] with the same ID was already registered, this does 
nothing instead
-   * of overwriting it. This happens when we copy accumulators, e.g. when we 
reconstruct
-   * [[org.apache.spark.executor.TaskMetrics]] from accumulator updates.
-   */
-  def register(a: Accumulable[_, _]): Unit = synchronized {
-    if (!originals.contains(a.id)) {
-      originals(a.id) = new WeakReference[Accumulable[_, _]](a)
-    }
-  }
-
-  /**
-   * Unregister the [[Accumulable]] with the given ID, if any.
-   */
-  def remove(accId: Long): Unit = synchronized {
-    originals.remove(accId)
-  }
-
-  /**
-   * Return the [[Accumulable]] registered with the given ID, if any.
-   */
-  def get(id: Long): Option[Accumulable[_, _]] = synchronized {
-    originals.get(id).map { weakRef =>
-      // Since we are storing weak references, we must check whether the 
underlying data is valid.
-      weakRef.get.getOrElse {
-        throw new IllegalAccessError(s"Attempted to access garbage collected 
accumulator $id")
-      }
-    }
-  }
-
-  /**
-   * Clear all registered [[Accumulable]]s. For testing only.
-   */
-  def clear(): Unit = synchronized {
-    originals.clear()
-  }
-
-}
-
-
 /**
  * A simpler version of [[org.apache.spark.AccumulableParam]] where the only 
data type you can add
  * in is the same type as the accumulated value. An implicit AccumulatorParam 
object needs to be

http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/core/src/main/scala/org/apache/spark/ContextCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala 
b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
index 76692cc..63a00a8 100644
--- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
@@ -144,7 +144,7 @@ private[spark] class ContextCleaner(sc: SparkContext) 
extends Logging {
     registerForCleanup(rdd, CleanRDD(rdd.id))
   }
 
-  def registerAccumulatorForCleanup(a: Accumulable[_, _]): Unit = {
+  def registerAccumulatorForCleanup(a: NewAccumulator[_, _]): Unit = {
     registerForCleanup(a, CleanAccum(a.id))
   }
 
@@ -241,7 +241,7 @@ private[spark] class ContextCleaner(sc: SparkContext) 
extends Logging {
   def doCleanupAccum(accId: Long, blocking: Boolean): Unit = {
     try {
       logDebug("Cleaning accumulator " + accId)
-      Accumulators.remove(accId)
+      AccumulatorContext.remove(accId)
       listeners.asScala.foreach(_.accumCleaned(accId))
       logInfo("Cleaned accumulator " + accId)
     } catch {

http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala 
b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
index 2bdbd3f..9eac05f 100644
--- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
+++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
@@ -35,7 +35,7 @@ import org.apache.spark.util.{Clock, SystemClock, 
ThreadUtils, Utils}
  */
 private[spark] case class Heartbeat(
     executorId: String,
-    accumUpdates: Array[(Long, Seq[AccumulableInfo])], // taskId -> accum 
updates
+    accumUpdates: Array[(Long, Seq[NewAccumulator[_, _]])], // taskId -> 
accumulator updates
     blockManagerId: BlockManagerId)
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/core/src/main/scala/org/apache/spark/NewAccumulator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/NewAccumulator.scala 
b/core/src/main/scala/org/apache/spark/NewAccumulator.scala
new file mode 100644
index 0000000..edb9b74
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/NewAccumulator.scala
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.{lang => jl}
+import java.io.ObjectInputStream
+import java.util.concurrent.atomic.AtomicLong
+import javax.annotation.concurrent.GuardedBy
+
+import org.apache.spark.scheduler.AccumulableInfo
+import org.apache.spark.util.Utils
+
+
+private[spark] case class AccumulatorMetadata(
+    id: Long,
+    name: Option[String],
+    countFailedValues: Boolean) extends Serializable
+
+
+/**
+ * The base class for accumulators, that can accumulate inputs of type `IN`, 
and produce output of
+ * type `OUT`.
+ */
+abstract class NewAccumulator[IN, OUT] extends Serializable {
+  private[spark] var metadata: AccumulatorMetadata = _
+  private[this] var atDriverSide = true
+
+  private[spark] def register(
+      sc: SparkContext,
+      name: Option[String] = None,
+      countFailedValues: Boolean = false): Unit = {
+    if (this.metadata != null) {
+      throw new IllegalStateException("Cannot register an Accumulator twice.")
+    }
+    this.metadata = AccumulatorMetadata(AccumulatorContext.newId(), name, 
countFailedValues)
+    AccumulatorContext.register(this)
+    sc.cleaner.foreach(_.registerAccumulatorForCleanup(this))
+  }
+
+  /**
+   * Returns true if this accumulator has been registered.  Note that all 
accumulators must be
+   * registered before ues, or it will throw exception.
+   */
+  final def isRegistered: Boolean =
+    metadata != null && AccumulatorContext.originals.containsKey(metadata.id)
+
+  private def assertMetadataNotNull(): Unit = {
+    if (metadata == null) {
+      throw new IllegalAccessError("The metadata of this accumulator has not 
been assigned yet.")
+    }
+  }
+
+  /**
+   * Returns the id of this accumulator, can only be called after registration.
+   */
+  final def id: Long = {
+    assertMetadataNotNull()
+    metadata.id
+  }
+
+  /**
+   * Returns the name of this accumulator, can only be called after 
registration.
+   */
+  final def name: Option[String] = {
+    assertMetadataNotNull()
+    metadata.name
+  }
+
+  /**
+   * Whether to accumulate values from failed tasks. This is set to true for 
system and time
+   * metrics like serialization time or bytes spilled, and false for things 
with absolute values
+   * like number of input rows.  This should be used for internal metrics only.
+   */
+  private[spark] final def countFailedValues: Boolean = {
+    assertMetadataNotNull()
+    metadata.countFailedValues
+  }
+
+  /**
+   * Creates an [[AccumulableInfo]] representation of this [[NewAccumulator]] 
with the provided
+   * values.
+   */
+  private[spark] def toInfo(update: Option[Any], value: Option[Any]): 
AccumulableInfo = {
+    val isInternal = 
name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX))
+    new AccumulableInfo(id, name, update, value, isInternal, countFailedValues)
+  }
+
+  final private[spark] def isAtDriverSide: Boolean = atDriverSide
+
+  /**
+   * Tells if this accumulator is zero value or not. e.g. for a counter 
accumulator, 0 is zero
+   * value; for a list accumulator, Nil is zero value.
+   */
+  def isZero(): Boolean
+
+  /**
+   * Creates a new copy of this accumulator, which is zero value. i.e. call 
`isZero` on the copy
+   * must return true.
+   */
+  def copyAndReset(): NewAccumulator[IN, OUT]
+
+  /**
+   * Takes the inputs and accumulates. e.g. it can be a simple `+=` for 
counter accumulator.
+   */
+  def add(v: IN): Unit
+
+  /**
+   * Merges another same-type accumulator into this one and update its state, 
i.e. this should be
+   * merge-in-place.
+   */
+  def merge(other: NewAccumulator[IN, OUT]): Unit
+
+  /**
+   * Access this accumulator's current value; only allowed on driver.
+   */
+  final def value: OUT = {
+    if (atDriverSide) {
+      localValue
+    } else {
+      throw new UnsupportedOperationException("Can't read accumulator value in 
task")
+    }
+  }
+
+  /**
+   * Defines the current value of this accumulator.
+   *
+   * This is NOT the global value of the accumulator.  To get the global value 
after a
+   * completed operation on the dataset, call `value`.
+   */
+  def localValue: OUT
+
+  // Called by Java when serializing an object
+  final protected def writeReplace(): Any = {
+    if (atDriverSide) {
+      if (!isRegistered) {
+        throw new UnsupportedOperationException(
+          "Accumulator must be registered before send to executor")
+      }
+      val copy = copyAndReset()
+      assert(copy.isZero(), "copyAndReset must return a zero value copy")
+      copy.metadata = metadata
+      copy
+    } else {
+      this
+    }
+  }
+
+  // Called by Java when deserializing an object
+  private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException 
{
+    in.defaultReadObject()
+    if (atDriverSide) {
+      atDriverSide = false
+
+      // Automatically register the accumulator when it is deserialized with 
the task closure.
+      // This is for external accumulators and internal ones that do not 
represent task level
+      // metrics, e.g. internal SQL metrics, which are per-operator.
+      val taskContext = TaskContext.get()
+      if (taskContext != null) {
+        taskContext.registerAccumulator(this)
+      }
+    } else {
+      atDriverSide = true
+    }
+  }
+
+  override def toString: String = {
+    if (metadata == null) {
+      "Un-registered Accumulator: " + getClass.getSimpleName
+    } else {
+      getClass.getSimpleName + s"(id: $id, name: $name, value: $localValue)"
+    }
+  }
+}
+
+
+private[spark] object AccumulatorContext {
+
+  /**
+   * This global map holds the original accumulator objects that are created 
on the driver.
+   * It keeps weak references to these objects so that accumulators can be 
garbage-collected
+   * once the RDDs and user-code that reference them are cleaned up.
+   * TODO: Don't use a global map; these should be tied to a SparkContext 
(SPARK-13051).
+   */
+  @GuardedBy("AccumulatorContext")
+  val originals = new java.util.HashMap[Long, 
jl.ref.WeakReference[NewAccumulator[_, _]]]
+
+  private[this] val nextId = new AtomicLong(0L)
+
+  /**
+   * Return a globally unique ID for a new [[Accumulator]].
+   * Note: Once you copy the [[Accumulator]] the ID is no longer unique.
+   */
+  def newId(): Long = nextId.getAndIncrement
+
+  /**
+   * Register an [[Accumulator]] created on the driver such that it can be 
used on the executors.
+   *
+   * All accumulators registered here can later be used as a container for 
accumulating partial
+   * values across multiple tasks. This is what 
[[org.apache.spark.scheduler.DAGScheduler]] does.
+   * Note: if an accumulator is registered here, it should also be registered 
with the active
+   * context cleaner for cleanup so as to avoid memory leaks.
+   *
+   * If an [[Accumulator]] with the same ID was already registered, this does 
nothing instead
+   * of overwriting it. We will never register same accumulator twice, this is 
just a sanity check.
+   */
+  def register(a: NewAccumulator[_, _]): Unit = synchronized {
+    if (!originals.containsKey(a.id)) {
+      originals.put(a.id, new jl.ref.WeakReference[NewAccumulator[_, _]](a))
+    }
+  }
+
+  /**
+   * Unregister the [[Accumulator]] with the given ID, if any.
+   */
+  def remove(id: Long): Unit = synchronized {
+    originals.remove(id)
+  }
+
+  /**
+   * Return the [[Accumulator]] registered with the given ID, if any.
+   */
+  def get(id: Long): Option[NewAccumulator[_, _]] = synchronized {
+    Option(originals.get(id)).map { ref =>
+      // Since we are storing weak references, we must check whether the 
underlying data is valid.
+      val acc = ref.get
+      if (acc eq null) {
+        throw new IllegalAccessError(s"Attempted to access garbage collected 
accumulator $id")
+      }
+      acc
+    }
+  }
+
+  /**
+   * Clear all registered [[Accumulator]]s. For testing only.
+   */
+  def clear(): Unit = synchronized {
+    originals.clear()
+  }
+}
+
+
+class LongAccumulator extends NewAccumulator[jl.Long, jl.Long] {
+  private[this] var _sum = 0L
+
+  override def isZero(): Boolean = _sum == 0
+
+  override def copyAndReset(): LongAccumulator = new LongAccumulator
+
+  override def add(v: jl.Long): Unit = _sum += v
+
+  def add(v: Long): Unit = _sum += v
+
+  def sum: Long = _sum
+
+  override def merge(other: NewAccumulator[jl.Long, jl.Long]): Unit = other 
match {
+    case o: LongAccumulator => _sum += o.sum
+    case _ => throw new UnsupportedOperationException(
+      s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
+  }
+
+  private[spark] def setValue(newValue: Long): Unit = _sum = newValue
+
+  override def localValue: jl.Long = _sum
+}
+
+
+class DoubleAccumulator extends NewAccumulator[jl.Double, jl.Double] {
+  private[this] var _sum = 0.0
+
+  override def isZero(): Boolean = _sum == 0.0
+
+  override def copyAndReset(): DoubleAccumulator = new DoubleAccumulator
+
+  override def add(v: jl.Double): Unit = _sum += v
+
+  def add(v: Double): Unit = _sum += v
+
+  def sum: Double = _sum
+
+  override def merge(other: NewAccumulator[jl.Double, jl.Double]): Unit = 
other match {
+    case o: DoubleAccumulator => _sum += o.sum
+    case _ => throw new UnsupportedOperationException(
+      s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
+  }
+
+  private[spark] def setValue(newValue: Double): Unit = _sum = newValue
+
+  override def localValue: jl.Double = _sum
+}
+
+
+class AverageAccumulator extends NewAccumulator[jl.Double, jl.Double] {
+  private[this] var _sum = 0.0
+  private[this] var _count = 0L
+
+  override def isZero(): Boolean = _sum == 0.0 && _count == 0
+
+  override def copyAndReset(): AverageAccumulator = new AverageAccumulator
+
+  override def add(v: jl.Double): Unit = {
+    _sum += v
+    _count += 1
+  }
+
+  def add(d: Double): Unit = {
+    _sum += d
+    _count += 1
+  }
+
+  override def merge(other: NewAccumulator[jl.Double, jl.Double]): Unit = 
other match {
+    case o: AverageAccumulator =>
+      _sum += o.sum
+      _count += o.count
+    case _ => throw new UnsupportedOperationException(
+      s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
+  }
+
+  override def localValue: jl.Double = if (_count == 0) {
+    Double.NaN
+  } else {
+    _sum / _count
+  }
+
+  def sum: Double = _sum
+
+  def count: Long = _count
+}
+
+
+class ListAccumulator[T] extends NewAccumulator[T, java.util.List[T]] {
+  private[this] val _list: java.util.List[T] = new java.util.ArrayList[T]
+
+  override def isZero(): Boolean = _list.isEmpty
+
+  override def copyAndReset(): ListAccumulator[T] = new ListAccumulator
+
+  override def add(v: T): Unit = _list.add(v)
+
+  override def merge(other: NewAccumulator[T, java.util.List[T]]): Unit = 
other match {
+    case o: ListAccumulator[T] => _list.addAll(o.localValue)
+    case _ => throw new UnsupportedOperationException(
+      s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
+  }
+
+  override def localValue: java.util.List[T] = 
java.util.Collections.unmodifiableList(_list)
+
+  private[spark] def setValue(newValue: java.util.List[T]): Unit = {
+    _list.clear()
+    _list.addAll(newValue)
+  }
+}
+
+
+class LegacyAccumulatorWrapper[R, T](
+    initialValue: R,
+    param: org.apache.spark.AccumulableParam[R, T]) extends NewAccumulator[T, 
R] {
+  private[spark] var _value = initialValue  // Current value on driver
+
+  override def isZero(): Boolean = _value == param.zero(initialValue)
+
+  override def copyAndReset(): LegacyAccumulatorWrapper[R, T] = {
+    val acc = new LegacyAccumulatorWrapper(initialValue, param)
+    acc._value = param.zero(initialValue)
+    acc
+  }
+
+  override def add(v: T): Unit = _value = param.addAccumulator(_value, v)
+
+  override def merge(other: NewAccumulator[T, R]): Unit = other match {
+    case o: LegacyAccumulatorWrapper[R, T] => _value = 
param.addInPlace(_value, o.localValue)
+    case _ => throw new UnsupportedOperationException(
+      s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
+  }
+
+  override def localValue: R = _value
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index f322a77..865989a 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1217,10 +1217,9 @@ class SparkContext(config: SparkConf) extends Logging 
with ExecutorAllocationCli
    * Create an [[org.apache.spark.Accumulator]] variable of a given type, 
which tasks can "add"
    * values to using the `+=` method. Only the driver can access the 
accumulator's `value`.
    */
-  def accumulator[T](initialValue: T)(implicit param: AccumulatorParam[T]): 
Accumulator[T] =
-  {
+  def accumulator[T](initialValue: T)(implicit param: AccumulatorParam[T]): 
Accumulator[T] = {
     val acc = new Accumulator(initialValue, param)
-    cleaner.foreach(_.registerAccumulatorForCleanup(acc))
+    cleaner.foreach(_.registerAccumulatorForCleanup(acc.newAcc))
     acc
   }
 
@@ -1232,7 +1231,7 @@ class SparkContext(config: SparkConf) extends Logging 
with ExecutorAllocationCli
   def accumulator[T](initialValue: T, name: String)(implicit param: 
AccumulatorParam[T])
     : Accumulator[T] = {
     val acc = new Accumulator(initialValue, param, Some(name))
-    cleaner.foreach(_.registerAccumulatorForCleanup(acc))
+    cleaner.foreach(_.registerAccumulatorForCleanup(acc.newAcc))
     acc
   }
 
@@ -1245,7 +1244,7 @@ class SparkContext(config: SparkConf) extends Logging 
with ExecutorAllocationCli
   def accumulable[R, T](initialValue: R)(implicit param: AccumulableParam[R, 
T])
     : Accumulable[R, T] = {
     val acc = new Accumulable(initialValue, param)
-    cleaner.foreach(_.registerAccumulatorForCleanup(acc))
+    cleaner.foreach(_.registerAccumulatorForCleanup(acc.newAcc))
     acc
   }
 
@@ -1259,7 +1258,7 @@ class SparkContext(config: SparkConf) extends Logging 
with ExecutorAllocationCli
   def accumulable[R, T](initialValue: R, name: String)(implicit param: 
AccumulableParam[R, T])
     : Accumulable[R, T] = {
     val acc = new Accumulable(initialValue, param, Some(name))
-    cleaner.foreach(_.registerAccumulatorForCleanup(acc))
+    cleaner.foreach(_.registerAccumulatorForCleanup(acc.newAcc))
     acc
   }
 
@@ -1273,7 +1272,101 @@ class SparkContext(config: SparkConf) extends Logging 
with ExecutorAllocationCli
       (initialValue: R): Accumulable[R, T] = {
     val param = new GrowableAccumulableParam[R, T]
     val acc = new Accumulable(initialValue, param)
-    cleaner.foreach(_.registerAccumulatorForCleanup(acc))
+    cleaner.foreach(_.registerAccumulatorForCleanup(acc.newAcc))
+    acc
+  }
+
+  /**
+   * Register the given accumulator.  Note that accumulators must be 
registered before use, or it
+   * will throw exception.
+   */
+  def register(acc: NewAccumulator[_, _]): Unit = {
+    acc.register(this)
+  }
+
+  /**
+   * Register the given accumulator with given name.  Note that accumulators 
must be registered
+   * before use, or it will throw exception.
+   */
+  def register(acc: NewAccumulator[_, _], name: String): Unit = {
+    acc.register(this, name = Some(name))
+  }
+
+  /**
+   * Create and register a long accumulator, which starts with 0 and 
accumulates inputs by `+=`.
+   */
+  def longAccumulator: LongAccumulator = {
+    val acc = new LongAccumulator
+    register(acc)
+    acc
+  }
+
+  /**
+   * Create and register a long accumulator, which starts with 0 and 
accumulates inputs by `+=`.
+   */
+  def longAccumulator(name: String): LongAccumulator = {
+    val acc = new LongAccumulator
+    register(acc, name)
+    acc
+  }
+
+  /**
+   * Create and register a double accumulator, which starts with 0 and 
accumulates inputs by `+=`.
+   */
+  def doubleAccumulator: DoubleAccumulator = {
+    val acc = new DoubleAccumulator
+    register(acc)
+    acc
+  }
+
+  /**
+   * Create and register a double accumulator, which starts with 0 and 
accumulates inputs by `+=`.
+   */
+  def doubleAccumulator(name: String): DoubleAccumulator = {
+    val acc = new DoubleAccumulator
+    register(acc, name)
+    acc
+  }
+
+  /**
+   * Create and register an average accumulator, which accumulates double 
inputs by recording the
+   * total sum and total count, and produce the output by sum / total.  Note 
that Double.NaN will be
+   * returned if no input is added.
+   */
+  def averageAccumulator: AverageAccumulator = {
+    val acc = new AverageAccumulator
+    register(acc)
+    acc
+  }
+
+  /**
+   * Create and register an average accumulator, which accumulates double 
inputs by recording the
+   * total sum and total count, and produce the output by sum / total.  Note 
that Double.NaN will be
+   * returned if no input is added.
+   */
+  def averageAccumulator(name: String): AverageAccumulator = {
+    val acc = new AverageAccumulator
+    register(acc, name)
+    acc
+  }
+
+  /**
+   * Create and register a list accumulator, which starts with empty list and 
accumulates inputs
+   * by adding them into the inner list.
+   */
+  def listAccumulator[T]: ListAccumulator[T] = {
+    val acc = new ListAccumulator[T]
+    register(acc)
+    acc
+  }
+
+  /**
+   * Create and register a list accumulator, which starts with empty list and 
accumulates inputs
+   * by adding them into the inner list.
+   */
+  def listAccumulator[T](name: String): ListAccumulator[T] = {
+    val acc = new ListAccumulator[T]
+    register(acc, name)
     acc
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/core/src/main/scala/org/apache/spark/TaskContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala 
b/core/src/main/scala/org/apache/spark/TaskContext.scala
index e7940bd..9e53257 100644
--- a/core/src/main/scala/org/apache/spark/TaskContext.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContext.scala
@@ -188,6 +188,6 @@ abstract class TaskContext extends Serializable {
    * Register an accumulator that belongs to this task. Accumulators must call 
this method when
    * deserializing in executors.
    */
-  private[spark] def registerAccumulator(a: Accumulable[_, _]): Unit
+  private[spark] def registerAccumulator(a: NewAccumulator[_, _]): Unit
 
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala 
b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
index 43e5556..bc3807f 100644
--- a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
@@ -122,7 +122,7 @@ private[spark] class TaskContextImpl(
   override def getMetricsSources(sourceName: String): Seq[Source] =
     metricsSystem.getSourcesByName(sourceName)
 
-  private[spark] override def registerAccumulator(a: Accumulable[_, _]): Unit 
= {
+  private[spark] override def registerAccumulator(a: NewAccumulator[_, _]): 
Unit = {
     taskMetrics.registerAccumulator(a)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/core/src/main/scala/org/apache/spark/TaskEndReason.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala 
b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
index 7487cfe..82ba2d0 100644
--- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala
+++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
@@ -19,10 +19,7 @@ package org.apache.spark
 
 import java.io.{ObjectInputStream, ObjectOutputStream}
 
-import scala.util.Try
-
 import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.internal.Logging
 import org.apache.spark.scheduler.AccumulableInfo
 import org.apache.spark.storage.BlockManagerId
@@ -120,18 +117,10 @@ case class ExceptionFailure(
     stackTrace: Array[StackTraceElement],
     fullStackTrace: String,
     private val exceptionWrapper: Option[ThrowableSerializationWrapper],
-    accumUpdates: Seq[AccumulableInfo] = Seq.empty[AccumulableInfo])
+    accumUpdates: Seq[AccumulableInfo] = Seq.empty,
+    private[spark] var accums: Seq[NewAccumulator[_, _]] = Nil)
   extends TaskFailedReason {
 
-  @deprecated("use accumUpdates instead", "2.0.0")
-  val metrics: Option[TaskMetrics] = {
-    if (accumUpdates.nonEmpty) {
-      Try(TaskMetrics.fromAccumulatorUpdates(accumUpdates)).toOption
-    } else {
-      None
-    }
-  }
-
   /**
    * `preserveCause` is used to keep the exception itself so it is available 
to the
    * driver. This may be set to `false` in the event that the exception is not 
in fact
@@ -149,6 +138,11 @@ case class ExceptionFailure(
     this(e, accumUpdates, preserveCause = true)
   }
 
+  private[spark] def withAccums(accums: Seq[NewAccumulator[_, _]]): 
ExceptionFailure = {
+    this.accums = accums
+    this
+  }
+
   def exception: Option[Throwable] = exceptionWrapper.flatMap(w => 
Option(w.exception))
 
   override def toErrorString: String =

http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala 
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 650f05c..4d61f7e 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -353,22 +353,24 @@ private[spark] class Executor(
           logError(s"Exception in $taskName (TID $taskId)", t)
 
           // Collect latest accumulator values to report back to the driver
-          val accumulatorUpdates: Seq[AccumulableInfo] =
+          val accums: Seq[NewAccumulator[_, _]] =
             if (task != null) {
               task.metrics.setExecutorRunTime(System.currentTimeMillis() - 
taskStart)
               task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime)
               task.collectAccumulatorUpdates(taskFailed = true)
             } else {
-              Seq.empty[AccumulableInfo]
+              Seq.empty
             }
 
+          val accUpdates = accums.map(acc => acc.toInfo(Some(acc.localValue), 
None))
+
           val serializedTaskEndReason = {
             try {
-              ser.serialize(new ExceptionFailure(t, accumulatorUpdates))
+              ser.serialize(new ExceptionFailure(t, 
accUpdates).withAccums(accums))
             } catch {
               case _: NotSerializableException =>
                 // t is not serializable so just send the stacktrace
-                ser.serialize(new ExceptionFailure(t, accumulatorUpdates, 
preserveCause = false))
+                ser.serialize(new ExceptionFailure(t, accUpdates, 
false).withAccums(accums))
             }
           }
           execBackend.statusUpdate(taskId, TaskState.FAILED, 
serializedTaskEndReason)
@@ -476,14 +478,14 @@ private[spark] class Executor(
   /** Reports heartbeat and metrics for active tasks to the driver. */
   private def reportHeartBeat(): Unit = {
     // list of (task id, accumUpdates) to send back to the driver
-    val accumUpdates = new ArrayBuffer[(Long, Seq[AccumulableInfo])]()
+    val accumUpdates = new ArrayBuffer[(Long, Seq[NewAccumulator[_, _]])]()
     val curGCTime = computeTotalGcTime()
 
     for (taskRunner <- runningTasks.values().asScala) {
       if (taskRunner.task != null) {
         taskRunner.task.metrics.mergeShuffleReadMetrics()
         taskRunner.task.metrics.setJvmGCTime(curGCTime - 
taskRunner.startGCTime)
-        accumUpdates += ((taskRunner.taskId, 
taskRunner.task.metrics.accumulatorUpdates()))
+        accumUpdates += ((taskRunner.taskId, 
taskRunner.task.metrics.accumulators()))
       }
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala 
b/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala
index 535352e..6f7160a 100644
--- a/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.executor
 
-import org.apache.spark.InternalAccumulator
+import org.apache.spark.LongAccumulator
 import org.apache.spark.annotation.DeveloperApi
 
 
@@ -40,20 +40,18 @@ object DataReadMethod extends Enumeration with Serializable 
{
  */
 @DeveloperApi
 class InputMetrics private[spark] () extends Serializable {
-  import InternalAccumulator._
-
-  private[executor] val _bytesRead = 
TaskMetrics.createLongAccum(input.BYTES_READ)
-  private[executor] val _recordsRead = 
TaskMetrics.createLongAccum(input.RECORDS_READ)
+  private[executor] val _bytesRead = new LongAccumulator
+  private[executor] val _recordsRead = new LongAccumulator
 
   /**
    * Total number of bytes read.
    */
-  def bytesRead: Long = _bytesRead.localValue
+  def bytesRead: Long = _bytesRead.sum
 
   /**
    * Total number of records read.
    */
-  def recordsRead: Long = _recordsRead.localValue
+  def recordsRead: Long = _recordsRead.sum
 
   private[spark] def incBytesRead(v: Long): Unit = _bytesRead.add(v)
   private[spark] def incRecordsRead(v: Long): Unit = _recordsRead.add(v)

http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala 
b/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala
index 586c98b..db3924c 100644
--- a/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.executor
 
-import org.apache.spark.InternalAccumulator
+import org.apache.spark.LongAccumulator
 import org.apache.spark.annotation.DeveloperApi
 
 
@@ -39,20 +39,18 @@ object DataWriteMethod extends Enumeration with 
Serializable {
  */
 @DeveloperApi
 class OutputMetrics private[spark] () extends Serializable {
-  import InternalAccumulator._
-
-  private[executor] val _bytesWritten = 
TaskMetrics.createLongAccum(output.BYTES_WRITTEN)
-  private[executor] val _recordsWritten = 
TaskMetrics.createLongAccum(output.RECORDS_WRITTEN)
+  private[executor] val _bytesWritten = new LongAccumulator
+  private[executor] val _recordsWritten = new LongAccumulator
 
   /**
    * Total number of bytes written.
    */
-  def bytesWritten: Long = _bytesWritten.localValue
+  def bytesWritten: Long = _bytesWritten.sum
 
   /**
    * Total number of records written.
    */
-  def recordsWritten: Long = _recordsWritten.localValue
+  def recordsWritten: Long = _recordsWritten.sum
 
   private[spark] def setBytesWritten(v: Long): Unit = _bytesWritten.setValue(v)
   private[spark] def setRecordsWritten(v: Long): Unit = 
_recordsWritten.setValue(v)

http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala 
b/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala
index f012a74..fa96210 100644
--- a/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.executor
 
-import org.apache.spark.InternalAccumulator
+import org.apache.spark.LongAccumulator
 import org.apache.spark.annotation.DeveloperApi
 
 
@@ -28,52 +28,44 @@ import org.apache.spark.annotation.DeveloperApi
  */
 @DeveloperApi
 class ShuffleReadMetrics private[spark] () extends Serializable {
-  import InternalAccumulator._
-
-  private[executor] val _remoteBlocksFetched =
-    TaskMetrics.createIntAccum(shuffleRead.REMOTE_BLOCKS_FETCHED)
-  private[executor] val _localBlocksFetched =
-    TaskMetrics.createIntAccum(shuffleRead.LOCAL_BLOCKS_FETCHED)
-  private[executor] val _remoteBytesRead =
-    TaskMetrics.createLongAccum(shuffleRead.REMOTE_BYTES_READ)
-  private[executor] val _localBytesRead =
-    TaskMetrics.createLongAccum(shuffleRead.LOCAL_BYTES_READ)
-  private[executor] val _fetchWaitTime =
-    TaskMetrics.createLongAccum(shuffleRead.FETCH_WAIT_TIME)
-  private[executor] val _recordsRead =
-    TaskMetrics.createLongAccum(shuffleRead.RECORDS_READ)
+  private[executor] val _remoteBlocksFetched = new LongAccumulator
+  private[executor] val _localBlocksFetched = new LongAccumulator
+  private[executor] val _remoteBytesRead = new LongAccumulator
+  private[executor] val _localBytesRead = new LongAccumulator
+  private[executor] val _fetchWaitTime = new LongAccumulator
+  private[executor] val _recordsRead = new LongAccumulator
 
   /**
    * Number of remote blocks fetched in this shuffle by this task.
    */
-  def remoteBlocksFetched: Int = _remoteBlocksFetched.localValue
+  def remoteBlocksFetched: Long = _remoteBlocksFetched.sum
 
   /**
    * Number of local blocks fetched in this shuffle by this task.
    */
-  def localBlocksFetched: Int = _localBlocksFetched.localValue
+  def localBlocksFetched: Long = _localBlocksFetched.sum
 
   /**
    * Total number of remote bytes read from the shuffle by this task.
    */
-  def remoteBytesRead: Long = _remoteBytesRead.localValue
+  def remoteBytesRead: Long = _remoteBytesRead.sum
 
   /**
    * Shuffle data that was read from the local disk (as opposed to from a 
remote executor).
    */
-  def localBytesRead: Long = _localBytesRead.localValue
+  def localBytesRead: Long = _localBytesRead.sum
 
   /**
    * Time the task spent waiting for remote shuffle blocks. This only includes 
the time
    * blocking on shuffle input data. For instance if block B is being fetched 
while the task is
    * still not finished processing block A, it is not considered to be 
blocking on block B.
    */
-  def fetchWaitTime: Long = _fetchWaitTime.localValue
+  def fetchWaitTime: Long = _fetchWaitTime.sum
 
   /**
    * Total number of records read from the shuffle by this task.
    */
-  def recordsRead: Long = _recordsRead.localValue
+  def recordsRead: Long = _recordsRead.sum
 
   /**
    * Total bytes fetched in the shuffle by this task (both remote and local).
@@ -83,10 +75,10 @@ class ShuffleReadMetrics private[spark] () extends 
Serializable {
   /**
    * Number of blocks fetched in this shuffle by this task (remote or local).
    */
-  def totalBlocksFetched: Int = remoteBlocksFetched + localBlocksFetched
+  def totalBlocksFetched: Long = remoteBlocksFetched + localBlocksFetched
 
-  private[spark] def incRemoteBlocksFetched(v: Int): Unit = 
_remoteBlocksFetched.add(v)
-  private[spark] def incLocalBlocksFetched(v: Int): Unit = 
_localBlocksFetched.add(v)
+  private[spark] def incRemoteBlocksFetched(v: Long): Unit = 
_remoteBlocksFetched.add(v)
+  private[spark] def incLocalBlocksFetched(v: Long): Unit = 
_localBlocksFetched.add(v)
   private[spark] def incRemoteBytesRead(v: Long): Unit = 
_remoteBytesRead.add(v)
   private[spark] def incLocalBytesRead(v: Long): Unit = _localBytesRead.add(v)
   private[spark] def incFetchWaitTime(v: Long): Unit = _fetchWaitTime.add(v)
@@ -104,12 +96,12 @@ class ShuffleReadMetrics private[spark] () extends 
Serializable {
    * [[TempShuffleReadMetrics]] into `this`.
    */
   private[spark] def setMergeValues(metrics: Seq[TempShuffleReadMetrics]): 
Unit = {
-    _remoteBlocksFetched.setValue(_remoteBlocksFetched.zero)
-    _localBlocksFetched.setValue(_localBlocksFetched.zero)
-    _remoteBytesRead.setValue(_remoteBytesRead.zero)
-    _localBytesRead.setValue(_localBytesRead.zero)
-    _fetchWaitTime.setValue(_fetchWaitTime.zero)
-    _recordsRead.setValue(_recordsRead.zero)
+    _remoteBlocksFetched.setValue(0)
+    _localBlocksFetched.setValue(0)
+    _remoteBytesRead.setValue(0)
+    _localBytesRead.setValue(0)
+    _fetchWaitTime.setValue(0)
+    _recordsRead.setValue(0)
     metrics.foreach { metric =>
       _remoteBlocksFetched.add(metric.remoteBlocksFetched)
       _localBlocksFetched.add(metric.localBlocksFetched)
@@ -127,22 +119,22 @@ class ShuffleReadMetrics private[spark] () extends 
Serializable {
  * last.
  */
 private[spark] class TempShuffleReadMetrics {
-  private[this] var _remoteBlocksFetched = 0
-  private[this] var _localBlocksFetched = 0
+  private[this] var _remoteBlocksFetched = 0L
+  private[this] var _localBlocksFetched = 0L
   private[this] var _remoteBytesRead = 0L
   private[this] var _localBytesRead = 0L
   private[this] var _fetchWaitTime = 0L
   private[this] var _recordsRead = 0L
 
-  def incRemoteBlocksFetched(v: Int): Unit = _remoteBlocksFetched += v
-  def incLocalBlocksFetched(v: Int): Unit = _localBlocksFetched += v
+  def incRemoteBlocksFetched(v: Long): Unit = _remoteBlocksFetched += v
+  def incLocalBlocksFetched(v: Long): Unit = _localBlocksFetched += v
   def incRemoteBytesRead(v: Long): Unit = _remoteBytesRead += v
   def incLocalBytesRead(v: Long): Unit = _localBytesRead += v
   def incFetchWaitTime(v: Long): Unit = _fetchWaitTime += v
   def incRecordsRead(v: Long): Unit = _recordsRead += v
 
-  def remoteBlocksFetched: Int = _remoteBlocksFetched
-  def localBlocksFetched: Int = _localBlocksFetched
+  def remoteBlocksFetched: Long = _remoteBlocksFetched
+  def localBlocksFetched: Long = _localBlocksFetched
   def remoteBytesRead: Long = _remoteBytesRead
   def localBytesRead: Long = _localBytesRead
   def fetchWaitTime: Long = _fetchWaitTime

http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala 
b/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala
index 7326fba..0e70a4f 100644
--- a/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.executor
 
-import org.apache.spark.InternalAccumulator
+import org.apache.spark.LongAccumulator
 import org.apache.spark.annotation.DeveloperApi
 
 
@@ -28,29 +28,24 @@ import org.apache.spark.annotation.DeveloperApi
  */
 @DeveloperApi
 class ShuffleWriteMetrics private[spark] () extends Serializable {
-  import InternalAccumulator._
-
-  private[executor] val _bytesWritten =
-    TaskMetrics.createLongAccum(shuffleWrite.BYTES_WRITTEN)
-  private[executor] val _recordsWritten =
-    TaskMetrics.createLongAccum(shuffleWrite.RECORDS_WRITTEN)
-  private[executor] val _writeTime =
-    TaskMetrics.createLongAccum(shuffleWrite.WRITE_TIME)
+  private[executor] val _bytesWritten = new LongAccumulator
+  private[executor] val _recordsWritten = new LongAccumulator
+  private[executor] val _writeTime = new LongAccumulator
 
   /**
    * Number of bytes written for the shuffle by this task.
    */
-  def bytesWritten: Long = _bytesWritten.localValue
+  def bytesWritten: Long = _bytesWritten.sum
 
   /**
    * Total number of records written to the shuffle by this task.
    */
-  def recordsWritten: Long = _recordsWritten.localValue
+  def recordsWritten: Long = _recordsWritten.sum
 
   /**
    * Time the task spent blocking on writes to disk or buffer cache, in 
nanoseconds.
    */
-  def writeTime: Long = _writeTime.localValue
+  def writeTime: Long = _writeTime.sum
 
   private[spark] def incBytesWritten(v: Long): Unit = _bytesWritten.add(v)
   private[spark] def incRecordsWritten(v: Long): Unit = _recordsWritten.add(v)

http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala 
b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index 8513d05..0b64917 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -17,10 +17,9 @@
 
 package org.apache.spark.executor
 
-import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.{ArrayBuffer, LinkedHashMap}
 
 import org.apache.spark._
-import org.apache.spark.AccumulatorParam.{IntAccumulatorParam, 
LongAccumulatorParam, UpdatedBlockStatusesAccumulatorParam}
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.internal.Logging
 import org.apache.spark.scheduler.AccumulableInfo
@@ -42,53 +41,51 @@ import org.apache.spark.storage.{BlockId, BlockStatus}
  */
 @DeveloperApi
 class TaskMetrics private[spark] () extends Serializable {
-  import InternalAccumulator._
-
   // Each metric is internally represented as an accumulator
-  private val _executorDeserializeTime = 
TaskMetrics.createLongAccum(EXECUTOR_DESERIALIZE_TIME)
-  private val _executorRunTime = TaskMetrics.createLongAccum(EXECUTOR_RUN_TIME)
-  private val _resultSize = TaskMetrics.createLongAccum(RESULT_SIZE)
-  private val _jvmGCTime = TaskMetrics.createLongAccum(JVM_GC_TIME)
-  private val _resultSerializationTime = 
TaskMetrics.createLongAccum(RESULT_SERIALIZATION_TIME)
-  private val _memoryBytesSpilled = 
TaskMetrics.createLongAccum(MEMORY_BYTES_SPILLED)
-  private val _diskBytesSpilled = 
TaskMetrics.createLongAccum(DISK_BYTES_SPILLED)
-  private val _peakExecutionMemory = 
TaskMetrics.createLongAccum(PEAK_EXECUTION_MEMORY)
-  private val _updatedBlockStatuses = 
TaskMetrics.createBlocksAccum(UPDATED_BLOCK_STATUSES)
+  private val _executorDeserializeTime = new LongAccumulator
+  private val _executorRunTime = new LongAccumulator
+  private val _resultSize = new LongAccumulator
+  private val _jvmGCTime = new LongAccumulator
+  private val _resultSerializationTime = new LongAccumulator
+  private val _memoryBytesSpilled = new LongAccumulator
+  private val _diskBytesSpilled = new LongAccumulator
+  private val _peakExecutionMemory = new LongAccumulator
+  private val _updatedBlockStatuses = new BlockStatusesAccumulator
 
   /**
    * Time taken on the executor to deserialize this task.
    */
-  def executorDeserializeTime: Long = _executorDeserializeTime.localValue
+  def executorDeserializeTime: Long = _executorDeserializeTime.sum
 
   /**
    * Time the executor spends actually running the task (including fetching 
shuffle data).
    */
-  def executorRunTime: Long = _executorRunTime.localValue
+  def executorRunTime: Long = _executorRunTime.sum
 
   /**
    * The number of bytes this task transmitted back to the driver as the 
TaskResult.
    */
-  def resultSize: Long = _resultSize.localValue
+  def resultSize: Long = _resultSize.sum
 
   /**
    * Amount of time the JVM spent in garbage collection while executing this 
task.
    */
-  def jvmGCTime: Long = _jvmGCTime.localValue
+  def jvmGCTime: Long = _jvmGCTime.sum
 
   /**
    * Amount of time spent serializing the task result.
    */
-  def resultSerializationTime: Long = _resultSerializationTime.localValue
+  def resultSerializationTime: Long = _resultSerializationTime.sum
 
   /**
    * The number of in-memory bytes spilled by this task.
    */
-  def memoryBytesSpilled: Long = _memoryBytesSpilled.localValue
+  def memoryBytesSpilled: Long = _memoryBytesSpilled.sum
 
   /**
    * The number of on-disk bytes spilled by this task.
    */
-  def diskBytesSpilled: Long = _diskBytesSpilled.localValue
+  def diskBytesSpilled: Long = _diskBytesSpilled.sum
 
   /**
    * Peak memory used by internal data structures created during shuffles, 
aggregations and
@@ -96,7 +93,7 @@ class TaskMetrics private[spark] () extends Serializable {
    * across all such data structures created in this task. For SQL jobs, this 
only tracks all
    * unsafe operators and ExternalSort.
    */
-  def peakExecutionMemory: Long = _peakExecutionMemory.localValue
+  def peakExecutionMemory: Long = _peakExecutionMemory.sum
 
   /**
    * Storage statuses of any blocks that have been updated as a result of this 
task.
@@ -114,7 +111,7 @@ class TaskMetrics private[spark] () extends Serializable {
   private[spark] def incMemoryBytesSpilled(v: Long): Unit = 
_memoryBytesSpilled.add(v)
   private[spark] def incDiskBytesSpilled(v: Long): Unit = 
_diskBytesSpilled.add(v)
   private[spark] def incPeakExecutionMemory(v: Long): Unit = 
_peakExecutionMemory.add(v)
-  private[spark] def incUpdatedBlockStatuses(v: Seq[(BlockId, BlockStatus)]): 
Unit =
+  private[spark] def incUpdatedBlockStatuses(v: (BlockId, BlockStatus)): Unit =
     _updatedBlockStatuses.add(v)
   private[spark] def setUpdatedBlockStatuses(v: Seq[(BlockId, BlockStatus)]): 
Unit =
     _updatedBlockStatuses.setValue(v)
@@ -175,124 +172,143 @@ class TaskMetrics private[spark] () extends 
Serializable {
   }
 
   // Only used for test
-  private[spark] val testAccum =
-    sys.props.get("spark.testing").map(_ => 
TaskMetrics.createLongAccum(TEST_ACCUM))
-
-  @transient private[spark] lazy val internalAccums: Seq[Accumulable[_, _]] = {
-    val in = inputMetrics
-    val out = outputMetrics
-    val sr = shuffleReadMetrics
-    val sw = shuffleWriteMetrics
-    Seq(_executorDeserializeTime, _executorRunTime, _resultSize, _jvmGCTime,
-      _resultSerializationTime, _memoryBytesSpilled, _diskBytesSpilled, 
_peakExecutionMemory,
-      _updatedBlockStatuses, sr._remoteBlocksFetched, sr._localBlocksFetched, 
sr._remoteBytesRead,
-      sr._localBytesRead, sr._fetchWaitTime, sr._recordsRead, 
sw._bytesWritten, sw._recordsWritten,
-      sw._writeTime, in._bytesRead, in._recordsRead, out._bytesWritten, 
out._recordsWritten) ++
-      testAccum
-  }
+  private[spark] val testAccum = sys.props.get("spark.testing").map(_ => new 
LongAccumulator)
+
+
+  import InternalAccumulator._
+  @transient private[spark] lazy val nameToAccums = LinkedHashMap(
+    EXECUTOR_DESERIALIZE_TIME -> _executorDeserializeTime,
+    EXECUTOR_RUN_TIME -> _executorRunTime,
+    RESULT_SIZE -> _resultSize,
+    JVM_GC_TIME -> _jvmGCTime,
+    RESULT_SERIALIZATION_TIME -> _resultSerializationTime,
+    MEMORY_BYTES_SPILLED -> _memoryBytesSpilled,
+    DISK_BYTES_SPILLED -> _diskBytesSpilled,
+    PEAK_EXECUTION_MEMORY -> _peakExecutionMemory,
+    UPDATED_BLOCK_STATUSES -> _updatedBlockStatuses,
+    shuffleRead.REMOTE_BLOCKS_FETCHED -> 
shuffleReadMetrics._remoteBlocksFetched,
+    shuffleRead.LOCAL_BLOCKS_FETCHED -> shuffleReadMetrics._localBlocksFetched,
+    shuffleRead.REMOTE_BYTES_READ -> shuffleReadMetrics._remoteBytesRead,
+    shuffleRead.LOCAL_BYTES_READ -> shuffleReadMetrics._localBytesRead,
+    shuffleRead.FETCH_WAIT_TIME -> shuffleReadMetrics._fetchWaitTime,
+    shuffleRead.RECORDS_READ -> shuffleReadMetrics._recordsRead,
+    shuffleWrite.BYTES_WRITTEN -> shuffleWriteMetrics._bytesWritten,
+    shuffleWrite.RECORDS_WRITTEN -> shuffleWriteMetrics._recordsWritten,
+    shuffleWrite.WRITE_TIME -> shuffleWriteMetrics._writeTime,
+    input.BYTES_READ -> inputMetrics._bytesRead,
+    input.RECORDS_READ -> inputMetrics._recordsRead,
+    output.BYTES_WRITTEN -> outputMetrics._bytesWritten,
+    output.RECORDS_WRITTEN -> outputMetrics._recordsWritten
+  ) ++ testAccum.map(TEST_ACCUM -> _)
+
+  @transient private[spark] lazy val internalAccums: Seq[NewAccumulator[_, _]] 
=
+    nameToAccums.values.toIndexedSeq
 
   /* ========================== *
    |        OTHER THINGS        |
    * ========================== */
 
-  private[spark] def registerForCleanup(sc: SparkContext): Unit = {
-    internalAccums.foreach { accum =>
-      sc.cleaner.foreach(_.registerAccumulatorForCleanup(accum))
+  private[spark] def register(sc: SparkContext): Unit = {
+    nameToAccums.foreach {
+      case (name, acc) => acc.register(sc, name = Some(name), 
countFailedValues = true)
     }
   }
 
   /**
    * External accumulators registered with this task.
    */
-  @transient private lazy val externalAccums = new ArrayBuffer[Accumulable[_, 
_]]
+  @transient private lazy val externalAccums = new 
ArrayBuffer[NewAccumulator[_, _]]
 
-  private[spark] def registerAccumulator(a: Accumulable[_, _]): Unit = {
+  private[spark] def registerAccumulator(a: NewAccumulator[_, _]): Unit = {
     externalAccums += a
   }
 
-  /**
-   * Return the latest updates of accumulators in this task.
-   *
-   * The [[AccumulableInfo.update]] field is always defined and the 
[[AccumulableInfo.value]]
-   * field is always empty, since this represents the partial updates recorded 
in this task,
-   * not the aggregated value across multiple tasks.
-   */
-  def accumulatorUpdates(): Seq[AccumulableInfo] = {
-    (internalAccums ++ externalAccums).map { a => a.toInfo(Some(a.localValue), 
None) }
-  }
+  private[spark] def accumulators(): Seq[NewAccumulator[_, _]] = 
internalAccums ++ externalAccums
 }
 
-/**
- * Internal subclass of [[TaskMetrics]] which is used only for posting events 
to listeners.
- * Its purpose is to obviate the need for the driver to reconstruct the 
original accumulators,
- * which might have been garbage-collected. See SPARK-13407 for more details.
- *
- * Instances of this class should be considered read-only and users should not 
call `inc*()` or
- * `set*()` methods. While we could override the setter methods to throw
- * UnsupportedOperationException, we choose not to do so because the overrides 
would quickly become
- * out-of-date when new metrics are added.
- */
-private[spark] class ListenerTaskMetrics(accumUpdates: Seq[AccumulableInfo]) 
extends TaskMetrics {
-
-  override def accumulatorUpdates(): Seq[AccumulableInfo] = accumUpdates
-
-  override private[spark] def registerAccumulator(a: Accumulable[_, _]): Unit 
= {
-    throw new UnsupportedOperationException("This TaskMetrics is read-only")
-  }
-}
 
 private[spark] object TaskMetrics extends Logging {
+  import InternalAccumulator._
 
   /**
    * Create an empty task metrics that doesn't register its accumulators.
    */
   def empty: TaskMetrics = {
-    val metrics = new TaskMetrics
-    metrics.internalAccums.foreach(acc => Accumulators.remove(acc.id))
-    metrics
+    val tm = new TaskMetrics
+    tm.nameToAccums.foreach { case (name, acc) =>
+      acc.metadata = AccumulatorMetadata(AccumulatorContext.newId(), 
Some(name), true)
+    }
+    tm
+  }
+
+  def registered: TaskMetrics = {
+    val tm = empty
+    tm.internalAccums.foreach(AccumulatorContext.register)
+    tm
   }
 
   /**
-   * Create a new accumulator representing an internal task metric.
+   * Construct a [[TaskMetrics]] object from a list of [[AccumulableInfo]], 
called on driver only.
+   * The returned [[TaskMetrics]] is only used to get some internal metrics, 
we don't need to take
+   * care of external accumulator info passed in.
    */
-  private def newMetric[T](
-      initialValue: T,
-      name: String,
-      param: AccumulatorParam[T]): Accumulator[T] = {
-    new Accumulator[T](initialValue, param, Some(name), countFailedValues = 
true)
+  def fromAccumulatorInfos(infos: Seq[AccumulableInfo]): TaskMetrics = {
+    val tm = new TaskMetrics
+    infos.filter(info => info.name.isDefined && info.update.isDefined).foreach 
{ info =>
+      val name = info.name.get
+      val value = info.update.get
+      if (name == UPDATED_BLOCK_STATUSES) {
+        tm.setUpdatedBlockStatuses(value.asInstanceOf[Seq[(BlockId, 
BlockStatus)]])
+      } else {
+        tm.nameToAccums.get(name).foreach(
+          _.asInstanceOf[LongAccumulator].setValue(value.asInstanceOf[Long])
+        )
+      }
+    }
+    tm
   }
 
-  def createLongAccum(name: String): Accumulator[Long] = {
-    newMetric(0L, name, LongAccumulatorParam)
-  }
+  /**
+   * Construct a [[TaskMetrics]] object from a list of accumulator updates, 
called on driver only.
+   */
+  def fromAccumulators(accums: Seq[NewAccumulator[_, _]]): TaskMetrics = {
+    val tm = new TaskMetrics
+    val (internalAccums, externalAccums) =
+      accums.partition(a => a.name.isDefined && 
tm.nameToAccums.contains(a.name.get))
+
+    internalAccums.foreach { acc =>
+      val tmAcc = 
tm.nameToAccums(acc.name.get).asInstanceOf[NewAccumulator[Any, Any]]
+      tmAcc.metadata = acc.metadata
+      tmAcc.merge(acc.asInstanceOf[NewAccumulator[Any, Any]])
+    }
 
-  def createIntAccum(name: String): Accumulator[Int] = {
-    newMetric(0, name, IntAccumulatorParam)
+    tm.externalAccums ++= externalAccums
+    tm
   }
+}
+
+
+private[spark] class BlockStatusesAccumulator
+  extends NewAccumulator[(BlockId, BlockStatus), Seq[(BlockId, BlockStatus)]] {
+  private[this] var _seq = ArrayBuffer.empty[(BlockId, BlockStatus)]
 
-  def createBlocksAccum(name: String): Accumulator[Seq[(BlockId, 
BlockStatus)]] = {
-    newMetric(Nil, name, UpdatedBlockStatusesAccumulatorParam)
+  override def isZero(): Boolean = _seq.isEmpty
+
+  override def copyAndReset(): BlockStatusesAccumulator = new 
BlockStatusesAccumulator
+
+  override def add(v: (BlockId, BlockStatus)): Unit = _seq += v
+
+  override def merge(other: NewAccumulator[(BlockId, BlockStatus), 
Seq[(BlockId, BlockStatus)]])
+  : Unit = other match {
+    case o: BlockStatusesAccumulator => _seq ++= o.localValue
+    case _ => throw new UnsupportedOperationException(
+      s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
   }
 
-  /**
-   * Construct a [[TaskMetrics]] object from a list of accumulator updates, 
called on driver only.
-   *
-   * Executors only send accumulator updates back to the driver, not 
[[TaskMetrics]]. However, we
-   * need the latter to post task end events to listeners, so we need to 
reconstruct the metrics
-   * on the driver.
-   *
-   * This assumes the provided updates contain the initial set of accumulators 
representing
-   * internal task level metrics.
-   */
-  def fromAccumulatorUpdates(accumUpdates: Seq[AccumulableInfo]): TaskMetrics 
= {
-    val definedAccumUpdates = accumUpdates.filter(_.update.isDefined)
-    val metrics = new ListenerTaskMetrics(definedAccumUpdates)
-    // We don't register this [[ListenerTaskMetrics]] for cleanup, and this is 
only used to post
-    // event, we should un-register all accumulators immediately.
-    metrics.internalAccums.foreach(acc => Accumulators.remove(acc.id))
-    definedAccumUpdates.filter(_.internal).foreach { accum =>
-      metrics.internalAccums.find(_.name == 
accum.name).foreach(_.setValueAny(accum.update.get))
-    }
-    metrics
+  override def localValue: Seq[(BlockId, BlockStatus)] = _seq
+
+  def setValue(newValue: Seq[(BlockId, BlockStatus)]): Unit = {
+    _seq.clear()
+    _seq ++= newValue
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index b7fb608..a96d5f6 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -209,7 +209,7 @@ class DAGScheduler(
       task: Task[_],
       reason: TaskEndReason,
       result: Any,
-      accumUpdates: Seq[AccumulableInfo],
+      accumUpdates: Seq[NewAccumulator[_, _]],
       taskInfo: TaskInfo): Unit = {
     eventProcessLoop.post(
       CompletionEvent(task, reason, result, accumUpdates, taskInfo))
@@ -1088,21 +1088,19 @@ class DAGScheduler(
     val task = event.task
     val stage = stageIdToStage(task.stageId)
     try {
-      event.accumUpdates.foreach { ainfo =>
-        assert(ainfo.update.isDefined, "accumulator from task should have a 
partial value")
-        val id = ainfo.id
-        val partialValue = ainfo.update.get
+      event.accumUpdates.foreach { updates =>
+        val id = updates.id
         // Find the corresponding accumulator on the driver and update it
-        val acc: Accumulable[Any, Any] = Accumulators.get(id) match {
-          case Some(accum) => accum.asInstanceOf[Accumulable[Any, Any]]
+        val acc: NewAccumulator[Any, Any] = AccumulatorContext.get(id) match {
+          case Some(accum) => accum.asInstanceOf[NewAccumulator[Any, Any]]
           case None =>
             throw new SparkException(s"attempted to access non-existent 
accumulator $id")
         }
-        acc ++= partialValue
+        acc.merge(updates.asInstanceOf[NewAccumulator[Any, Any]])
         // To avoid UI cruft, ignore cases where value wasn't updated
-        if (acc.name.isDefined && partialValue != acc.zero) {
+        if (acc.name.isDefined && !updates.isZero()) {
           stage.latestInfo.accumulables(id) = acc.toInfo(None, Some(acc.value))
-          event.taskInfo.accumulables += acc.toInfo(Some(partialValue), 
Some(acc.value))
+          event.taskInfo.accumulables += acc.toInfo(Some(updates.value), 
Some(acc.value))
         }
       }
     } catch {
@@ -1131,7 +1129,7 @@ class DAGScheduler(
     val taskMetrics: TaskMetrics =
       if (event.accumUpdates.nonEmpty) {
         try {
-          TaskMetrics.fromAccumulatorUpdates(event.accumUpdates)
+          TaskMetrics.fromAccumulators(event.accumUpdates)
         } catch {
           case NonFatal(e) =>
             logError(s"Error when attempting to reconstruct metrics for task 
$taskId", e)

http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
index a3845c6..e57a224 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
@@ -71,7 +71,7 @@ private[scheduler] case class CompletionEvent(
     task: Task[_],
     reason: TaskEndReason,
     result: Any,
-    accumUpdates: Seq[AccumulableInfo],
+    accumUpdates: Seq[NewAccumulator[_, _]],
     taskInfo: TaskInfo)
   extends DAGSchedulerEvent
 

http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala 
b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index 080ea6c..7618dfe 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -21,18 +21,15 @@ import java.util.Properties
 import javax.annotation.Nullable
 
 import scala.collection.Map
-import scala.collection.mutable
 
 import com.fasterxml.jackson.annotation.JsonTypeInfo
 
 import org.apache.spark.{SparkConf, TaskEndReason}
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.executor.TaskMetrics
-import org.apache.spark.internal.Logging
 import org.apache.spark.scheduler.cluster.ExecutorInfo
 import org.apache.spark.storage.{BlockManagerId, BlockUpdatedInfo}
 import org.apache.spark.ui.SparkUI
-import org.apache.spark.util.{Distribution, Utils}
 
 @DeveloperApi
 @JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, 
property = "Event")

http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala 
b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
index 02185bf..2f972b0 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
@@ -112,7 +112,7 @@ private[scheduler] abstract class Stage(
       numPartitionsToCompute: Int,
       taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty): Unit = {
     val metrics = new TaskMetrics
-    metrics.registerForCleanup(rdd.sparkContext)
+    metrics.register(rdd.sparkContext)
     _latestInfo = StageInfo.fromStage(
       this, nextAttemptId, Some(numPartitionsToCompute), metrics, 
taskLocalityPreferences)
     nextAttemptId += 1

http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/core/src/main/scala/org/apache/spark/scheduler/Task.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala 
b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index eb10f3e..e7ca6ef 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -23,7 +23,7 @@ import java.util.Properties
 
 import scala.collection.mutable.HashMap
 
-import org.apache.spark.{SparkEnv, TaskContext, TaskContextImpl}
+import org.apache.spark._
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.memory.{MemoryMode, TaskMemoryManager}
 import org.apache.spark.metrics.MetricsSystem
@@ -52,7 +52,7 @@ private[spark] abstract class Task[T](
     val stageAttemptId: Int,
     val partitionId: Int,
     // The default value is only used in tests.
-    val metrics: TaskMetrics = TaskMetrics.empty,
+    val metrics: TaskMetrics = TaskMetrics.registered,
     @transient var localProperties: Properties = new Properties) extends 
Serializable {
 
   /**
@@ -153,11 +153,11 @@ private[spark] abstract class Task[T](
    * Collect the latest values of accumulators used in this task. If the task 
failed,
    * filter out the accumulators whose values should not be included on 
failures.
    */
-  def collectAccumulatorUpdates(taskFailed: Boolean = false): 
Seq[AccumulableInfo] = {
+  def collectAccumulatorUpdates(taskFailed: Boolean = false): 
Seq[NewAccumulator[_, _]] = {
     if (context != null) {
-      context.taskMetrics.accumulatorUpdates().filter { a => !taskFailed || 
a.countFailedValues }
+      context.taskMetrics.accumulators().filter { a => !taskFailed || 
a.countFailedValues }
     } else {
-      Seq.empty[AccumulableInfo]
+      Seq.empty
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
index 03135e6..b472c55 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
@@ -22,7 +22,7 @@ import java.nio.ByteBuffer
 
 import scala.collection.mutable.ArrayBuffer
 
-import org.apache.spark.SparkEnv
+import org.apache.spark.{NewAccumulator, SparkEnv}
 import org.apache.spark.storage.BlockId
 import org.apache.spark.util.Utils
 
@@ -36,7 +36,7 @@ private[spark] case class IndirectTaskResult[T](blockId: 
BlockId, size: Int)
 /** A TaskResult that contains the task's return value and accumulator 
updates. */
 private[spark] class DirectTaskResult[T](
     var valueBytes: ByteBuffer,
-    var accumUpdates: Seq[AccumulableInfo])
+    var accumUpdates: Seq[NewAccumulator[_, _]])
   extends TaskResult[T] with Externalizable {
 
   private var valueObjectDeserialized = false
@@ -61,9 +61,9 @@ private[spark] class DirectTaskResult[T](
     if (numUpdates == 0) {
       accumUpdates = null
     } else {
-      val _accumUpdates = new ArrayBuffer[AccumulableInfo]
+      val _accumUpdates = new ArrayBuffer[NewAccumulator[_, _]]
       for (i <- 0 until numUpdates) {
-        _accumUpdates += in.readObject.asInstanceOf[AccumulableInfo]
+        _accumUpdates += in.readObject.asInstanceOf[NewAccumulator[_, _]]
       }
       accumUpdates = _accumUpdates
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
index ae7ef46..b438c28 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
@@ -93,9 +93,10 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, 
scheduler: TaskSchedul
           // we would have to serialize the result again after updating the 
size.
           result.accumUpdates = result.accumUpdates.map { a =>
             if (a.name == Some(InternalAccumulator.RESULT_SIZE)) {
-              assert(a.update == Some(0L),
-                "task result size should not have been set on the executors")
-              a.copy(update = Some(size.toLong))
+              val acc = a.asInstanceOf[LongAccumulator]
+              assert(acc.sum == 0L, "task result size should not have been set 
on the executors")
+              acc.setValue(size.toLong)
+              acc
             } else {
               a
             }

http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
index 647d44a..75a0c56 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.scheduler
 
+import org.apache.spark.NewAccumulator
 import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
 import org.apache.spark.storage.BlockManagerId
 
@@ -66,7 +67,7 @@ private[spark] trait TaskScheduler {
    */
   def executorHeartbeatReceived(
       execId: String,
-      accumUpdates: Array[(Long, Seq[AccumulableInfo])],
+      accumUpdates: Array[(Long, Seq[NewAccumulator[_, _]])],
       blockManagerId: BlockManagerId): Boolean
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index f31ec2a..776a322 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -384,13 +384,14 @@ private[spark] class TaskSchedulerImpl(
    */
   override def executorHeartbeatReceived(
       execId: String,
-      accumUpdates: Array[(Long, Seq[AccumulableInfo])],
+      accumUpdates: Array[(Long, Seq[NewAccumulator[_, _]])],
       blockManagerId: BlockManagerId): Boolean = {
     // (taskId, stageId, stageAttemptId, accumUpdates)
     val accumUpdatesWithTaskIds: Array[(Long, Int, Int, Seq[AccumulableInfo])] 
= synchronized {
       accumUpdates.flatMap { case (id, updates) =>
         taskIdToTaskSetManager.get(id).map { taskSetMgr =>
-          (id, taskSetMgr.stageId, taskSetMgr.taskSet.stageAttemptId, updates)
+          (id, taskSetMgr.stageId, taskSetMgr.taskSet.stageAttemptId,
+            updates.map(acc => acc.toInfo(Some(acc.value), None)))
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 6e08cdd..b79f643 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -647,7 +647,7 @@ private[spark] class TaskSetManager(
     info.markFailed()
     val index = info.index
     copiesRunning(index) -= 1
-    var accumUpdates: Seq[AccumulableInfo] = Seq.empty[AccumulableInfo]
+    var accumUpdates: Seq[NewAccumulator[_, _]] = Seq.empty
     val failureReason = s"Lost task ${info.id} in stage ${taskSet.id} (TID 
$tid, ${info.host}): " +
       reason.asInstanceOf[TaskFailedReason].toErrorString
     val failureException: Option[Throwable] = reason match {
@@ -663,7 +663,7 @@ private[spark] class TaskSetManager(
 
       case ef: ExceptionFailure =>
         // ExceptionFailure's might have accumulator updates
-        accumUpdates = ef.accumUpdates
+        accumUpdates = ef.accums
         if (ef.className == classOf[NotSerializableException].getName) {
           // If the task result wasn't serializable, there's no point in 
trying to re-execute it.
           logError("Task %s in stage %s (TID %d) had a not serializable 
result: %s; not retrying"
@@ -788,7 +788,7 @@ private[spark] class TaskSetManager(
           // Tell the DAGScheduler that this task was resubmitted so that it 
doesn't think our
           // stage finishes when a total of tasks.size tasks finish.
           sched.dagScheduler.taskEnded(
-            tasks(index), Resubmitted, null, Seq.empty[AccumulableInfo], info)
+            tasks(index), Resubmitted, null, Seq.empty, info)
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/core/src/main/scala/org/apache/spark/serializer/SerializationDebugger.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/serializer/SerializationDebugger.scala 
b/core/src/main/scala/org/apache/spark/serializer/SerializationDebugger.scala
index 8daca6c..c04b483 100644
--- 
a/core/src/main/scala/org/apache/spark/serializer/SerializationDebugger.scala
+++ 
b/core/src/main/scala/org/apache/spark/serializer/SerializationDebugger.scala
@@ -266,7 +266,13 @@ private[spark] object SerializationDebugger extends 
Logging {
       (o, desc)
     } else {
       // write place
-      findObjectAndDescriptor(desc.invokeWriteReplace(o))
+      val replaced = desc.invokeWriteReplace(o)
+      // `writeReplace` may return the same object.
+      if (replaced eq o) {
+        (o, desc)
+      } else {
+        findObjectAndDescriptor(replaced)
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala 
b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
index ff28796..32e332a 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
@@ -186,8 +186,8 @@ class OutputMetrics private[spark](
     val recordsWritten: Long)
 
 class ShuffleReadMetrics private[spark](
-    val remoteBlocksFetched: Int,
-    val localBlocksFetched: Int,
+    val remoteBlocksFetched: Long,
+    val localBlocksFetched: Long,
     val fetchWaitTime: Long,
     val remoteBytesRead: Long,
     val localBytesRead: Long,

http://git-wip-us.apache.org/repos/asf/spark/blob/bf5496db/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 1c49216..f2d06c7 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -801,7 +801,7 @@ private[spark] class BlockManager(
           reportBlockStatus(blockId, info, putBlockStatus)
         }
         Option(TaskContext.get()).foreach { c =>
-          c.taskMetrics().incUpdatedBlockStatuses(Seq((blockId, 
putBlockStatus)))
+          c.taskMetrics().incUpdatedBlockStatuses(blockId -> putBlockStatus)
         }
       }
       logDebug("Put block %s locally took %s".format(blockId, 
Utils.getUsedTimeMs(startTimeMs)))
@@ -958,7 +958,7 @@ private[spark] class BlockManager(
           reportBlockStatus(blockId, info, putBlockStatus)
         }
         Option(TaskContext.get()).foreach { c =>
-          c.taskMetrics().incUpdatedBlockStatuses(Seq((blockId, 
putBlockStatus)))
+          c.taskMetrics().incUpdatedBlockStatuses(blockId -> putBlockStatus)
         }
         logDebug("Put block %s locally took %s".format(blockId, 
Utils.getUsedTimeMs(startTimeMs)))
         if (level.replication > 1) {
@@ -1257,7 +1257,7 @@ private[spark] class BlockManager(
     }
     if (blockIsUpdated) {
       Option(TaskContext.get()).foreach { c =>
-        c.taskMetrics().incUpdatedBlockStatuses(Seq((blockId, status)))
+        c.taskMetrics().incUpdatedBlockStatuses(blockId -> status)
       }
     }
     status.storageLevel
@@ -1311,7 +1311,7 @@ private[spark] class BlockManager(
           reportBlockStatus(blockId, info, removeBlockStatus)
         }
         Option(TaskContext.get()).foreach { c =>
-          c.taskMetrics().incUpdatedBlockStatuses(Seq((blockId, 
removeBlockStatus)))
+          c.taskMetrics().incUpdatedBlockStatuses(blockId -> removeBlockStatus)
         }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to