[ https://issues.apache.org/jira/browse/SPARK-24154?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sergey Zhemzhitsky updated SPARK-24154: --------------------------------------- Description: AccumulatorV2 loses type information during serialization. It happens [here|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L164] during *writeReplace* call {code:scala} final protected def writeReplace(): Any = { if (atDriverSide) { if (!isRegistered) { throw new UnsupportedOperationException( "Accumulator must be registered before send to executor") } val copyAcc = copyAndReset() assert(copyAcc.isZero, "copyAndReset must return a zero value copy") val isInternalAcc = name.isDefined && name.get.startsWith(InternalAccumulator.METRICS_PREFIX) if (isInternalAcc) { // Do not serialize the name of internal accumulator and send it to executor. copyAcc.metadata = metadata.copy(name = None) } else { // For non-internal accumulators, we still need to send the name because users may need to // access the accumulator name at executor side, or they may keep the accumulators sent from // executors and access the name when the registered accumulator is already garbage // collected(e.g. SQLMetrics). copyAcc.metadata = metadata } copyAcc } else { this } } {code} It means that it is hardly possible to create new accumulators easily by adding new behaviour to existing ones by means of mix-ins or inheritance (without overriding *copy*). For example the following snippet ... {code:scala} trait TripleCount { self: LongAccumulator => abstract override def add(v: jl.Long): Unit = { self.add(v * 3) } } val acc = new LongAccumulator with TripleCount sc.register(acc) val data = 1 to 10 val rdd = sc.makeRDD(data, 5) rdd.foreach(acc.add(_)) acc.value shouldBe 3 * data.sum {code} ... fails with {code:none} org.scalatest.exceptions.TestFailedException: 55 was not equal to 165 at org.scalatest.MatchersHelper$.indicateFailure(MatchersHelper.scala:340) at org.scalatest.Matchers$AnyShouldWrapper.shouldBe(Matchers.scala:6864) {code} Also such a behaviour seems to be error prone and confusing because an implementor gets not the same thing as he/she sees in the code. was: AccumulatorV2 loses type information during serialization. It happens [here|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L164] during *writeReplace* call {code:scala} final protected def writeReplace(): Any = { if (atDriverSide) { if (!isRegistered) { throw new UnsupportedOperationException( "Accumulator must be registered before send to executor") } val copyAcc = copyAndReset() assert(copyAcc.isZero, "copyAndReset must return a zero value copy") val isInternalAcc = name.isDefined && name.get.startsWith(InternalAccumulator.METRICS_PREFIX) if (isInternalAcc) { // Do not serialize the name of internal accumulator and send it to executor. copyAcc.metadata = metadata.copy(name = None) } else { // For non-internal accumulators, we still need to send the name because users may need to // access the accumulator name at executor side, or they may keep the accumulators sent from // executors and access the name when the registered accumulator is already garbage // collected(e.g. SQLMetrics). copyAcc.metadata = metadata } copyAcc } else { this } } {code} It means that it is hardly possible to create new accumulators easily by adding new behaviour to existing ones by means of mix-ins or inheritance (without overriding *copy*). For example the following snippet ... {code:scala} trait TripleCount { self: LongAccumulator => abstract override def add(v: jl.Long): Unit = { self.add(v * 3) } } val acc = new LongAccumulator with TripleCount sc.register(acc) val data = 1 to 10 val rdd = sc.makeRDD(data, 5) rdd.foreach(acc.add(_)) acc.value shouldBe 3 * data.sum {code} ... fails with {code:none} org.scalatest.exceptions.TestFailedException: 55 was not equal to 165 at org.scalatest.MatchersHelper$.indicateFailure(MatchersHelper.scala:340) at org.scalatest.Matchers$AnyShouldWrapper.shouldBe(Matchers.scala:6864) {code} > AccumulatorV2 loses type information during serialization > --------------------------------------------------------- > > Key: SPARK-24154 > URL: https://issues.apache.org/jira/browse/SPARK-24154 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 2.2.0, 2.2.1, 2.3.0, 2.3.1 > Environment: Scala 2.11 > Spark 2.2.0 > Reporter: Sergey Zhemzhitsky > Priority: Major > > AccumulatorV2 loses type information during serialization. > It happens > [here|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L164] > during *writeReplace* call > {code:scala} > final protected def writeReplace(): Any = { > if (atDriverSide) { > if (!isRegistered) { > throw new UnsupportedOperationException( > "Accumulator must be registered before send to executor") > } > val copyAcc = copyAndReset() > assert(copyAcc.isZero, "copyAndReset must return a zero value copy") > val isInternalAcc = name.isDefined && > name.get.startsWith(InternalAccumulator.METRICS_PREFIX) > if (isInternalAcc) { > // Do not serialize the name of internal accumulator and send it to > executor. > copyAcc.metadata = metadata.copy(name = None) > } else { > // For non-internal accumulators, we still need to send the name > because users may need to > // access the accumulator name at executor side, or they may keep the > accumulators sent from > // executors and access the name when the registered accumulator is > already garbage > // collected(e.g. SQLMetrics). > copyAcc.metadata = metadata > } > copyAcc > } else { > this > } > } > {code} > It means that it is hardly possible to create new accumulators easily by > adding new behaviour to existing ones by means of mix-ins or inheritance > (without overriding *copy*). > For example the following snippet ... > {code:scala} > trait TripleCount { > self: LongAccumulator => > abstract override def add(v: jl.Long): Unit = { > self.add(v * 3) > } > } > val acc = new LongAccumulator with TripleCount > sc.register(acc) > val data = 1 to 10 > val rdd = sc.makeRDD(data, 5) > rdd.foreach(acc.add(_)) > acc.value shouldBe 3 * data.sum > {code} > ... fails with > {code:none} > org.scalatest.exceptions.TestFailedException: 55 was not equal to 165 > at org.scalatest.MatchersHelper$.indicateFailure(MatchersHelper.scala:340) > at org.scalatest.Matchers$AnyShouldWrapper.shouldBe(Matchers.scala:6864) > {code} > Also such a behaviour seems to be error prone and confusing because an > implementor gets not the same thing as he/she sees in the code. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org