[jira] [Updated] (SPARK-23187) Accumulator object can not be sent from Executor to Driver

2018-01-22 Thread Saisai Shao (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23187?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Saisai Shao updated SPARK-23187:

Affects Version/s: (was: 2.3.1)
   2.3.0

> Accumulator object can not be sent from Executor to Driver
> --
>
> Key: SPARK-23187
> URL: https://issues.apache.org/jira/browse/SPARK-23187
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.1, 2.3.0
>Reporter: Lantao Jin
>Priority: Major
>
> In the Executor.scala->reportHeartBeat(), task Metrics value can not be sent 
> to Driver (In receive side all values are zero).
> I write an UT for explanation.
> {code}
> diff --git 
> a/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala 
> b/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala
> index f9481f8..57fb096 100644
> --- a/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala
> +++ b/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala
> @@ -17,11 +17,16 @@
>  package org.apache.spark.rpc.netty
> +import scala.collection.mutable.ArrayBuffer
> +
>  import org.scalatest.mockito.MockitoSugar
>  import org.apache.spark._
>  import org.apache.spark.network.client.TransportClient
>  import org.apache.spark.rpc._
> +import org.apache.spark.util.AccumulatorContext
> +import org.apache.spark.util.AccumulatorV2
> +import org.apache.spark.util.LongAccumulator
>  class NettyRpcEnvSuite extends RpcEnvSuite with MockitoSugar {
> @@ -83,5 +88,21 @@ class NettyRpcEnvSuite extends RpcEnvSuite with 
> MockitoSugar {
>  assertRequestMessageEquals(
>msg3,
>RequestMessage(nettyEnv, client, msg3.serialize(nettyEnv)))
> +
> +val acc = new LongAccumulator
> +val sc = SparkContext.getOrCreate(new 
> SparkConf().setMaster("local").setAppName("testAcc"));
> +sc.register(acc, "testAcc")
> +acc.setValue(1)
> +//val msg4 = new RequestMessage(senderAddress, receiver, acc)
> +//assertRequestMessageEquals(
> +//  msg4,
> +//  RequestMessage(nettyEnv, client, msg4.serialize(nettyEnv)))
> +
> +val accbuf = new ArrayBuffer[AccumulatorV2[_, _]]()
> +accbuf += acc
> +val msg5 = new RequestMessage(senderAddress, receiver, accbuf)
> +assertRequestMessageEquals(
> +  msg5,
> +  RequestMessage(nettyEnv, client, msg5.serialize(nettyEnv)))
>}
>  }
> {code}
> msg4 and msg5 are all going to failed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23187) Accumulator object can not be sent from Executor to Driver

2018-01-22 Thread Lantao Jin (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23187?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lantao Jin updated SPARK-23187:
---
Affects Version/s: 2.3.1

> Accumulator object can not be sent from Executor to Driver
> --
>
> Key: SPARK-23187
> URL: https://issues.apache.org/jira/browse/SPARK-23187
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.1, 2.3.1
>Reporter: Lantao Jin
>Priority: Major
>
> In the Executor.scala->reportHeartBeat(), task Metrics value can not be sent 
> to Driver (In receive side all values are zero).
> I write an UT for explanation.
> {code}
> diff --git 
> a/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala 
> b/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala
> index f9481f8..57fb096 100644
> --- a/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala
> +++ b/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala
> @@ -17,11 +17,16 @@
>  package org.apache.spark.rpc.netty
> +import scala.collection.mutable.ArrayBuffer
> +
>  import org.scalatest.mockito.MockitoSugar
>  import org.apache.spark._
>  import org.apache.spark.network.client.TransportClient
>  import org.apache.spark.rpc._
> +import org.apache.spark.util.AccumulatorContext
> +import org.apache.spark.util.AccumulatorV2
> +import org.apache.spark.util.LongAccumulator
>  class NettyRpcEnvSuite extends RpcEnvSuite with MockitoSugar {
> @@ -83,5 +88,21 @@ class NettyRpcEnvSuite extends RpcEnvSuite with 
> MockitoSugar {
>  assertRequestMessageEquals(
>msg3,
>RequestMessage(nettyEnv, client, msg3.serialize(nettyEnv)))
> +
> +val acc = new LongAccumulator
> +val sc = SparkContext.getOrCreate(new 
> SparkConf().setMaster("local").setAppName("testAcc"));
> +sc.register(acc, "testAcc")
> +acc.setValue(1)
> +//val msg4 = new RequestMessage(senderAddress, receiver, acc)
> +//assertRequestMessageEquals(
> +//  msg4,
> +//  RequestMessage(nettyEnv, client, msg4.serialize(nettyEnv)))
> +
> +val accbuf = new ArrayBuffer[AccumulatorV2[_, _]]()
> +accbuf += acc
> +val msg5 = new RequestMessage(senderAddress, receiver, accbuf)
> +assertRequestMessageEquals(
> +  msg5,
> +  RequestMessage(nettyEnv, client, msg5.serialize(nettyEnv)))
>}
>  }
> {code}
> msg4 and msg5 are all going to failed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org