[ 
https://issues.apache.org/jira/browse/SPARK-24154?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Zhemzhitsky updated SPARK-24154:
---------------------------------------
    Description: 
AccumulatorV2 loses type information during serialization.
It happens 
[here|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L164]
 during *writeReplace* call
{code:scala}
final protected def writeReplace(): Any = {
  if (atDriverSide) {
    if (!isRegistered) {
      throw new UnsupportedOperationException(
        "Accumulator must be registered before send to executor")
    }
    val copyAcc = copyAndReset()
    assert(copyAcc.isZero, "copyAndReset must return a zero value copy")
    val isInternalAcc = name.isDefined && 
name.get.startsWith(InternalAccumulator.METRICS_PREFIX)
    if (isInternalAcc) {
      // Do not serialize the name of internal accumulator and send it to 
executor.
      copyAcc.metadata = metadata.copy(name = None)
    } else {
      // For non-internal accumulators, we still need to send the name because 
users may need to
      // access the accumulator name at executor side, or they may keep the 
accumulators sent from
      // executors and access the name when the registered accumulator is 
already garbage
      // collected(e.g. SQLMetrics).
      copyAcc.metadata = metadata
    }
    copyAcc
  } else {
    this
  }
}
{code}

It means that it is hardly possible to create new accumulators easily by adding 
new behaviour to existing ones by means of mix-ins or inheritance (without 
overriding *copy*).

For example the following snippet ...
{code:scala}
trait TripleCount {
  self: LongAccumulator =>
  abstract override def add(v: jl.Long): Unit = {
    self.add(v * 3)
  }
}
val acc = new LongAccumulator with TripleCount
sc.register(acc)

val data = 1 to 10
val rdd = sc.makeRDD(data, 5)

rdd.foreach(acc.add(_))
acc.value shouldBe 3 * data.sum
{code}

... fails with

{code:none}
org.scalatest.exceptions.TestFailedException: 55 was not equal to 165
  at org.scalatest.MatchersHelper$.indicateFailure(MatchersHelper.scala:340)
  at org.scalatest.Matchers$AnyShouldWrapper.shouldBe(Matchers.scala:6864)
{code}

Also such a behaviour seems to be error prone and confusing because an 
implementor gets not the same thing as he/she sees in the code.

  was:
AccumulatorV2 loses type information during serialization.
It happens 
[here|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L164]
 during *writeReplace* call
{code:scala}
final protected def writeReplace(): Any = {
  if (atDriverSide) {
    if (!isRegistered) {
      throw new UnsupportedOperationException(
        "Accumulator must be registered before send to executor")
    }
    val copyAcc = copyAndReset()
    assert(copyAcc.isZero, "copyAndReset must return a zero value copy")
    val isInternalAcc = name.isDefined && 
name.get.startsWith(InternalAccumulator.METRICS_PREFIX)
    if (isInternalAcc) {
      // Do not serialize the name of internal accumulator and send it to 
executor.
      copyAcc.metadata = metadata.copy(name = None)
    } else {
      // For non-internal accumulators, we still need to send the name because 
users may need to
      // access the accumulator name at executor side, or they may keep the 
accumulators sent from
      // executors and access the name when the registered accumulator is 
already garbage
      // collected(e.g. SQLMetrics).
      copyAcc.metadata = metadata
    }
    copyAcc
  } else {
    this
  }
}
{code}

It means that it is hardly possible to create new accumulators easily by adding 
new behaviour to existing ones by means of mix-ins or inheritance (without 
overriding *copy*).

For example the following snippet ...
{code:scala}
trait TripleCount {
  self: LongAccumulator =>
  abstract override def add(v: jl.Long): Unit = {
    self.add(v * 3)
  }
}
val acc = new LongAccumulator with TripleCount
sc.register(acc)

val data = 1 to 10
val rdd = sc.makeRDD(data, 5)

rdd.foreach(acc.add(_))
acc.value shouldBe 3 * data.sum
{code}

... fails with

{code:none}
org.scalatest.exceptions.TestFailedException: 55 was not equal to 165
  at org.scalatest.MatchersHelper$.indicateFailure(MatchersHelper.scala:340)
  at org.scalatest.Matchers$AnyShouldWrapper.shouldBe(Matchers.scala:6864)
{code}


> AccumulatorV2 loses type information during serialization
> ---------------------------------------------------------
>
>                 Key: SPARK-24154
>                 URL: https://issues.apache.org/jira/browse/SPARK-24154
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.2.0, 2.2.1, 2.3.0, 2.3.1
>         Environment: Scala 2.11
> Spark 2.2.0
>            Reporter: Sergey Zhemzhitsky
>            Priority: Major
>
> AccumulatorV2 loses type information during serialization.
> It happens 
> [here|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L164]
>  during *writeReplace* call
> {code:scala}
> final protected def writeReplace(): Any = {
>   if (atDriverSide) {
>     if (!isRegistered) {
>       throw new UnsupportedOperationException(
>         "Accumulator must be registered before send to executor")
>     }
>     val copyAcc = copyAndReset()
>     assert(copyAcc.isZero, "copyAndReset must return a zero value copy")
>     val isInternalAcc = name.isDefined && 
> name.get.startsWith(InternalAccumulator.METRICS_PREFIX)
>     if (isInternalAcc) {
>       // Do not serialize the name of internal accumulator and send it to 
> executor.
>       copyAcc.metadata = metadata.copy(name = None)
>     } else {
>       // For non-internal accumulators, we still need to send the name 
> because users may need to
>       // access the accumulator name at executor side, or they may keep the 
> accumulators sent from
>       // executors and access the name when the registered accumulator is 
> already garbage
>       // collected(e.g. SQLMetrics).
>       copyAcc.metadata = metadata
>     }
>     copyAcc
>   } else {
>     this
>   }
> }
> {code}
> It means that it is hardly possible to create new accumulators easily by 
> adding new behaviour to existing ones by means of mix-ins or inheritance 
> (without overriding *copy*).
> For example the following snippet ...
> {code:scala}
> trait TripleCount {
>   self: LongAccumulator =>
>   abstract override def add(v: jl.Long): Unit = {
>     self.add(v * 3)
>   }
> }
> val acc = new LongAccumulator with TripleCount
> sc.register(acc)
> val data = 1 to 10
> val rdd = sc.makeRDD(data, 5)
> rdd.foreach(acc.add(_))
> acc.value shouldBe 3 * data.sum
> {code}
> ... fails with
> {code:none}
> org.scalatest.exceptions.TestFailedException: 55 was not equal to 165
>   at org.scalatest.MatchersHelper$.indicateFailure(MatchersHelper.scala:340)
>   at org.scalatest.Matchers$AnyShouldWrapper.shouldBe(Matchers.scala:6864)
> {code}
> Also such a behaviour seems to be error prone and confusing because an 
> implementor gets not the same thing as he/she sees in the code.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to