[ 
https://issues.apache.org/jira/browse/SPARK-17951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15578987#comment-15578987
 ] 

ding edited comment on SPARK-17951 at 10/16/16 12:22 AM:
---------------------------------------------------------

I tried to call rdd.collect which internally called bm.getRemoteBytes in 
multiple threads. The data shows difference of time spend on block fetch is 
also around 0.1s between spark 1.5.1 and spark 1.6.2. And it can be 
consistently repro.

In our scenario, we called getRemoteBytes twice. If we use spark 1.6.2 it will 
bring additional 0.2s overhead and cause around 5 decrease of throughput. we 
would like to reduce the overhead if possible.


was (Author: ding):
I tried to call rdd.collect which internally called bm.getRemoteBytes in 
multiple threads. The data shows difference of time spend on block fetch is 
also around 0.1s between spark 1.5.1 and spark 1.6.2. And it can be 
consistently repro.

In our scenario, we called getRemoteBytes twice. If we use spark 1.6.2 it will 
bring additional 0.2s overhead and cause around 5 decrease of throughput. we 
would like to reduce the overhead as much as possible.

> BlockFetch with multiple threads slows down after spark 1.6
> -----------------------------------------------------------
>
>                 Key: SPARK-17951
>                 URL: https://issues.apache.org/jira/browse/SPARK-17951
>             Project: Spark
>          Issue Type: Bug
>          Components: Block Manager, Spark Core
>    Affects Versions: 1.6.2
>         Environment: cluster with 8 node, each node has 28 cores. 10Gb network
>            Reporter: ding
>
> The following code demonstrates the issue:
> {code}
> def main(args: Array[String]): Unit = {
>     val conf = new SparkConf().setAppName(s"BMTest")
>     val size = 3344570
>     val sc = new SparkContext(conf)
>     val data = sc.parallelize(1 to 100, 8)
>     var accum = sc.accumulator(0.0, "get remote bytes")
>     var i = 0
>     while(i < 91) {
>       accum = sc.accumulator(0.0, "get remote bytes")
>       val test = data.mapPartitionsWithIndex { (pid, iter) =>
>         val N = size
>         val bm = SparkEnv.get.blockManager
>         val blockId = TaskResultBlockId(10*i + pid)        
>         val test = new Array[Byte](N)
>         Random.nextBytes(test)
>         val buffer = ByteBuffer.allocate(N)
>         buffer.limit(N)
>         buffer.put(test)
>         bm.putBytes(blockId, buffer, StorageLevel.MEMORY_ONLY_SER)        
>         Iterator(1)
>       }.count()
>       
>       data.mapPartitionsWithIndex { (pid, iter) =>
>         val before = System.nanoTime()
>         
>         val bm = SparkEnv.get.blockManager
>         (0 to 7).map(s => {
>           Future {
>             val result = bm.getRemoteBytes(TaskResultBlockId(10*i + s))
>           }
>         }).map(Await.result(_, Duration.Inf))
>         
>         accum.add((System.nanoTime() - before) / 1e9)
>         Iterator(1)
>       }.count()
>       println("get remote bytes take: " + accum.value/8)
>       i += 1
>     }        
>   }
> {code}
> In spark1.6.2, average of "getting remote bytes" time is: 0.19 s while
> in spark 1.5.1 average of "getting remote bytes" time is: 0.09 s
> However if fetch block in single thread, the gap is much smaller.
> spark1.6.2  get remote bytes: 0.21 s
> spark1.5.1  get remote bytes: 0.20 s



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to