[jira] [Updated] (SPARK-17951) BlockFetch with multiple threads slows down after spark 1.6

2016-10-15 Thread Sean Owen (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17951?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean Owen updated SPARK-17951:
--
Description: 
The following code demonstrates the issue:

{code}
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName(s"BMTest")
val size = 3344570
val sc = new SparkContext(conf)

val data = sc.parallelize(1 to 100, 8)
var accum = sc.accumulator(0.0, "get remote bytes")
var i = 0
while(i < 91) {
  accum = sc.accumulator(0.0, "get remote bytes")
  val test = data.mapPartitionsWithIndex { (pid, iter) =>
val N = size
val bm = SparkEnv.get.blockManager
val blockId = TaskResultBlockId(10*i + pid)
val test = new Array[Byte](N)
Random.nextBytes(test)
val buffer = ByteBuffer.allocate(N)
buffer.limit(N)
buffer.put(test)
bm.putBytes(blockId, buffer, StorageLevel.MEMORY_ONLY_SER)
Iterator(1)
  }.count()
  
  data.mapPartitionsWithIndex { (pid, iter) =>
val before = System.nanoTime()

val bm = SparkEnv.get.blockManager
(0 to 7).map(s => {
  Future {
val result = bm.getRemoteBytes(TaskResultBlockId(10*i + s))
  }
}).map(Await.result(_, Duration.Inf))

accum.add((System.nanoTime() - before) / 1e9)
Iterator(1)
  }.count()
  println("get remote bytes take: " + accum.value/8)
  i += 1
}
  }
{code}


In spark1.6.2, average of "getting remote bytes" time is: 0.19 s while
in spark 1.5.1 average of "getting remote bytes" time is: 0.09 s

However if fetch block in single thread, the gap is much smaller.
spark1.6.2  get remote bytes: 0.21 s
spark1.5.1  get remote bytes: 0.20 s



  was:
The following code demonstrates the issue:
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName(s"BMTest")
val size = 3344570
val sc = new SparkContext(conf)

val data = sc.parallelize(1 to 100, 8)
var accum = sc.accumulator(0.0, "get remote bytes")
var i = 0
while(i < 91) {
  accum = sc.accumulator(0.0, "get remote bytes")
  val test = data.mapPartitionsWithIndex { (pid, iter) =>
val N = size
val bm = SparkEnv.get.blockManager
val blockId = TaskResultBlockId(10*i + pid)
val test = new Array[Byte](N)
Random.nextBytes(test)
val buffer = ByteBuffer.allocate(N)
buffer.limit(N)
buffer.put(test)
bm.putBytes(blockId, buffer, StorageLevel.MEMORY_ONLY_SER)
Iterator(1)
  }.count()
  
  data.mapPartitionsWithIndex { (pid, iter) =>
val before = System.nanoTime()

val bm = SparkEnv.get.blockManager
(0 to 7).map(s => {
  Future {
val result = bm.getRemoteBytes(TaskResultBlockId(10*i + s))
  }
}).map(Await.result(_, Duration.Inf))

accum.add((System.nanoTime() - before) / 1e9)
Iterator(1)
  }.count()
  println("get remote bytes take: " + accum.value/8)
  i += 1
}
  }

In spark1.6.2, average of "getting remote bytes" time is: 0.19 s while
in spark 1.5.1 average of "getting remote bytes" time is: 0.09 s

However if fetch block in single thread, the gap is much smaller.
spark1.6.2  get remote bytes: 0.21 s
spark1.5.1  get remote bytes: 0.20 s




> BlockFetch with multiple threads slows down after spark 1.6
> ---
>
> Key: SPARK-17951
> URL: https://issues.apache.org/jira/browse/SPARK-17951
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager, Spark Core
>Affects Versions: 1.6.2
> Environment: cluster with 8 node, each node has 28 cores. 10Gb network
>Reporter: ding
>
> The following code demonstrates the issue:
> {code}
> def main(args: Array[String]): Unit = {
> val conf = new SparkConf().setAppName(s"BMTest")
> val size = 3344570
> val sc = new SparkContext(conf)
> val data = sc.parallelize(1 to 100, 8)
> var accum = sc.accumulator(0.0, "get remote bytes")
> var i = 0
> while(i < 91) {
>   accum = sc.accumulator(0.0, "get remote bytes")
>   val test = data.mapPartitionsWithIndex { (pid, iter) =>
> val N = size
> val bm = SparkEnv.get.blockManager
> val blockId = TaskResultBlockId(10*i + pid)
> val test = new Array[Byte](N)
> Random.nextBytes(test)
> val buffer = ByteBuffer.allocate(N)
> buffer.limit(N)
> buffer.put(test)
> bm.putBytes(blockId, buffer, StorageLevel.MEMORY_ONLY_SER)
> Iterator(1)
>   }.count()
>   
>   data.mapPartitionsWithIndex { (pid, iter) =>
> val before = System.nanoTime()
> 
>  

[jira] [Updated] (SPARK-17951) BlockFetch with multiple threads slows down after spark 1.6

2016-10-14 Thread ding (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17951?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ding updated SPARK-17951:
-
Description: 
The following code demonstrates the issue:
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName(s"BMTest")
val size = 3344570
val sc = new SparkContext(conf)

val data = sc.parallelize(1 to 100, 8)
var accum = sc.accumulator(0.0, "get remote bytes")
var i = 0
while(i < 91) {
  accum = sc.accumulator(0.0, "get remote bytes")
  val test = data.mapPartitionsWithIndex { (pid, iter) =>
val N = size
val bm = SparkEnv.get.blockManager
val blockId = TaskResultBlockId(10*i + pid)
val test = new Array[Byte](N)
Random.nextBytes(test)
val buffer = ByteBuffer.allocate(N)
buffer.limit(N)
buffer.put(test)
bm.putBytes(blockId, buffer, StorageLevel.MEMORY_ONLY_SER)
Iterator(1)
  }.count()
  
  data.mapPartitionsWithIndex { (pid, iter) =>
val before = System.nanoTime()

val bm = SparkEnv.get.blockManager
(0 to 7).map(s => {
  Future {
val result = bm.getRemoteBytes(TaskResultBlockId(10*i + s))
  }
}).map(Await.result(_, Duration.Inf))

accum.add((System.nanoTime() - before) / 1e9)
Iterator(1)
  }.count()
  println("get remote bytes take: " + accum.value/8)
  i += 1
}
  }

In spark1.6.2, average of "getting remote bytes" time is: 0.19 s while
in spark 1.5.1 average of "getting remote bytes" time is: 0.09 s

However if fetch block in single thread, the gap is much smaller.
spark1.6.2  get remote bytes: 0.21 s
spark1.5.1  get remote bytes: 0.20 s



  was:
The following code demonstrates the issue:
 def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName(s"BMTest")
val size = 3344570
val sc = new SparkContext(conf)

val data = sc.parallelize(1 to 100, 8)
var accum = sc.accumulator(0.0, "get remote bytes")
var i = 0
while(i < 91) {
  accum = sc.accumulator(0.0, "get remote bytes")
  val test = data.mapPartitionsWithIndex { (pid, iter) =>
val N = size
val bm = SparkEnv.get.blockManager
val blockId = TaskResultBlockId(10*i + pid)
val test = new Array[Byte](N)
Random.nextBytes(test)
val buffer = ByteBuffer.allocate(N)
buffer.limit(N)
buffer.put(test)
bm.putBytes(blockId, buffer, StorageLevel.MEMORY_ONLY_SER)
Iterator(1)
  }.count()
  
  data.mapPartitionsWithIndex { (pid, iter) =>
val before = System.nanoTime()
val bm = SparkEnv.get.blockManager
(1 to 8).map(s => {
  Future {
val result = bm.getRemoteBytes(TaskResultBlockId(10*i + s))
  }
}).map(Await.result(_, Duration.Inf))

accum.add((System.nanoTime() - before) / 1e9)
Iterator(1)
  }.count()
  println("get remote bytes take: " + accum.value/8)
  i += 1
}
  }

In spark1.6.2, average of "getting remote bytes" time is: 0.16s while
in spark 1.5.1 average of "getting remote bytes" time is: 0.07s

However if fetch block in single thread, the gap is much smaller.
spark1.6.2  get remote bytes: 0.191421s
spark1.5.1  get remote bytes: 0.181312s




> BlockFetch with multiple threads slows down after spark 1.6
> ---
>
> Key: SPARK-17951
> URL: https://issues.apache.org/jira/browse/SPARK-17951
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager, Spark Core
>Affects Versions: 1.6.2
> Environment: cluster with 8 node, each node has 28 cores. 10Gb network
>Reporter: ding
>
> The following code demonstrates the issue:
> def main(args: Array[String]): Unit = {
> val conf = new SparkConf().setAppName(s"BMTest")
> val size = 3344570
> val sc = new SparkContext(conf)
> val data = sc.parallelize(1 to 100, 8)
> var accum = sc.accumulator(0.0, "get remote bytes")
> var i = 0
> while(i < 91) {
>   accum = sc.accumulator(0.0, "get remote bytes")
>   val test = data.mapPartitionsWithIndex { (pid, iter) =>
> val N = size
> val bm = SparkEnv.get.blockManager
> val blockId = TaskResultBlockId(10*i + pid)
> val test = new Array[Byte](N)
> Random.nextBytes(test)
> val buffer = ByteBuffer.allocate(N)
> buffer.limit(N)
> buffer.put(test)
> bm.putBytes(blockId, buffer, StorageLevel.MEMORY_ONLY_SER)
> Iterator(1)
>   }.count()
>   
>   data.mapPartitionsWithIndex { (pid, iter) =>
> val before = System.nanoTime()
> 
> val bm = SparkEnv.get.blockManager
>