Sergey Zhemzhitsky created SPARK-24154:
------------------------------------------

             Summary: AccumulatorV2 loses type information during serialization
                 Key: SPARK-24154
                 URL: https://issues.apache.org/jira/browse/SPARK-24154
             Project: Spark
          Issue Type: Bug
          Components: Spark Core
    Affects Versions: 2.3.0, 2.2.1, 2.2.0, 2.3.1
         Environment: Scala 2.11
Spark 2.2.0
            Reporter: Sergey Zhemzhitsky


AccumulatorV2 loses type information during serialization.
It happens 
[here|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L164]
 during *writeReplace* call
{code:scala}
final protected def writeReplace(): Any = {
  if (atDriverSide) {
    if (!isRegistered) {
      throw new UnsupportedOperationException(
        "Accumulator must be registered before send to executor")
    }
    val copyAcc = copyAndReset()
    assert(copyAcc.isZero, "copyAndReset must return a zero value copy")
    val isInternalAcc = name.isDefined && 
name.get.startsWith(InternalAccumulator.METRICS_PREFIX)
    if (isInternalAcc) {
      // Do not serialize the name of internal accumulator and send it to 
executor.
      copyAcc.metadata = metadata.copy(name = None)
    } else {
      // For non-internal accumulators, we still need to send the name because 
users may need to
      // access the accumulator name at executor side, or they may keep the 
accumulators sent from
      // executors and access the name when the registered accumulator is 
already garbage
      // collected(e.g. SQLMetrics).
      copyAcc.metadata = metadata
    }
    copyAcc
  } else {
    this
  }
}
{code}

It means that it is hardly possible to create new accumulators easily by adding 
new behaviour to existing ones by means of mix-ins or inheritance (without 
overriding *copy*).

For example the following snippet ...
{code:scala}
trait TripleCount {
  self: LongAccumulator =>
  abstract override def add(v: jl.Long): Unit = {
    self.add(v * 3)
  }
}
val acc = new LongAccumulator with TripleCount
sc.register(acc)

val data = 1 to 10
val rdd = sc.makeRDD(data, 5)

rdd.foreach(acc.add(_))
acc.value shouldBe 3 * data.sum
{code}

... fails with

{code:none}
org.scalatest.exceptions.TestFailedException: 55 was not equal to 165
  at org.scalatest.MatchersHelper$.indicateFailure(MatchersHelper.scala:340)
  at org.scalatest.Matchers$AnyShouldWrapper.shouldBe(Matchers.scala:6864)
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to