[jira] [Updated] (SPARK-17951) BlockFetch with multiple threads slows down after spark 1.6
[ https://issues.apache.org/jira/browse/SPARK-17951?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen updated SPARK-17951: -- Description: The following code demonstrates the issue: {code} def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName(s"BMTest") val size = 3344570 val sc = new SparkContext(conf) val data = sc.parallelize(1 to 100, 8) var accum = sc.accumulator(0.0, "get remote bytes") var i = 0 while(i < 91) { accum = sc.accumulator(0.0, "get remote bytes") val test = data.mapPartitionsWithIndex { (pid, iter) => val N = size val bm = SparkEnv.get.blockManager val blockId = TaskResultBlockId(10*i + pid) val test = new Array[Byte](N) Random.nextBytes(test) val buffer = ByteBuffer.allocate(N) buffer.limit(N) buffer.put(test) bm.putBytes(blockId, buffer, StorageLevel.MEMORY_ONLY_SER) Iterator(1) }.count() data.mapPartitionsWithIndex { (pid, iter) => val before = System.nanoTime() val bm = SparkEnv.get.blockManager (0 to 7).map(s => { Future { val result = bm.getRemoteBytes(TaskResultBlockId(10*i + s)) } }).map(Await.result(_, Duration.Inf)) accum.add((System.nanoTime() - before) / 1e9) Iterator(1) }.count() println("get remote bytes take: " + accum.value/8) i += 1 } } {code} In spark1.6.2, average of "getting remote bytes" time is: 0.19 s while in spark 1.5.1 average of "getting remote bytes" time is: 0.09 s However if fetch block in single thread, the gap is much smaller. spark1.6.2 get remote bytes: 0.21 s spark1.5.1 get remote bytes: 0.20 s was: The following code demonstrates the issue: def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName(s"BMTest") val size = 3344570 val sc = new SparkContext(conf) val data = sc.parallelize(1 to 100, 8) var accum = sc.accumulator(0.0, "get remote bytes") var i = 0 while(i < 91) { accum = sc.accumulator(0.0, "get remote bytes") val test = data.mapPartitionsWithIndex { (pid, iter) => val N = size val bm = SparkEnv.get.blockManager val blockId = TaskResultBlockId(10*i + pid) val test = new Array[Byte](N) Random.nextBytes(test) val buffer = ByteBuffer.allocate(N) buffer.limit(N) buffer.put(test) bm.putBytes(blockId, buffer, StorageLevel.MEMORY_ONLY_SER) Iterator(1) }.count() data.mapPartitionsWithIndex { (pid, iter) => val before = System.nanoTime() val bm = SparkEnv.get.blockManager (0 to 7).map(s => { Future { val result = bm.getRemoteBytes(TaskResultBlockId(10*i + s)) } }).map(Await.result(_, Duration.Inf)) accum.add((System.nanoTime() - before) / 1e9) Iterator(1) }.count() println("get remote bytes take: " + accum.value/8) i += 1 } } In spark1.6.2, average of "getting remote bytes" time is: 0.19 s while in spark 1.5.1 average of "getting remote bytes" time is: 0.09 s However if fetch block in single thread, the gap is much smaller. spark1.6.2 get remote bytes: 0.21 s spark1.5.1 get remote bytes: 0.20 s > BlockFetch with multiple threads slows down after spark 1.6 > --- > > Key: SPARK-17951 > URL: https://issues.apache.org/jira/browse/SPARK-17951 > Project: Spark > Issue Type: Bug > Components: Block Manager, Spark Core >Affects Versions: 1.6.2 > Environment: cluster with 8 node, each node has 28 cores. 10Gb network >Reporter: ding > > The following code demonstrates the issue: > {code} > def main(args: Array[String]): Unit = { > val conf = new SparkConf().setAppName(s"BMTest") > val size = 3344570 > val sc = new SparkContext(conf) > val data = sc.parallelize(1 to 100, 8) > var accum = sc.accumulator(0.0, "get remote bytes") > var i = 0 > while(i < 91) { > accum = sc.accumulator(0.0, "get remote bytes") > val test = data.mapPartitionsWithIndex { (pid, iter) => > val N = size > val bm = SparkEnv.get.blockManager > val blockId = TaskResultBlockId(10*i + pid) > val test = new Array[Byte](N) > Random.nextBytes(test) > val buffer = ByteBuffer.allocate(N) > buffer.limit(N) > buffer.put(test) > bm.putBytes(blockId, buffer, StorageLevel.MEMORY_ONLY_SER) > Iterator(1) > }.count() > > data.mapPartitionsWithIndex { (pid, iter) => > val before = System.nanoTime() > >
[jira] [Updated] (SPARK-17951) BlockFetch with multiple threads slows down after spark 1.6
[ https://issues.apache.org/jira/browse/SPARK-17951?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ding updated SPARK-17951: - Description: The following code demonstrates the issue: def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName(s"BMTest") val size = 3344570 val sc = new SparkContext(conf) val data = sc.parallelize(1 to 100, 8) var accum = sc.accumulator(0.0, "get remote bytes") var i = 0 while(i < 91) { accum = sc.accumulator(0.0, "get remote bytes") val test = data.mapPartitionsWithIndex { (pid, iter) => val N = size val bm = SparkEnv.get.blockManager val blockId = TaskResultBlockId(10*i + pid) val test = new Array[Byte](N) Random.nextBytes(test) val buffer = ByteBuffer.allocate(N) buffer.limit(N) buffer.put(test) bm.putBytes(blockId, buffer, StorageLevel.MEMORY_ONLY_SER) Iterator(1) }.count() data.mapPartitionsWithIndex { (pid, iter) => val before = System.nanoTime() val bm = SparkEnv.get.blockManager (0 to 7).map(s => { Future { val result = bm.getRemoteBytes(TaskResultBlockId(10*i + s)) } }).map(Await.result(_, Duration.Inf)) accum.add((System.nanoTime() - before) / 1e9) Iterator(1) }.count() println("get remote bytes take: " + accum.value/8) i += 1 } } In spark1.6.2, average of "getting remote bytes" time is: 0.19 s while in spark 1.5.1 average of "getting remote bytes" time is: 0.09 s However if fetch block in single thread, the gap is much smaller. spark1.6.2 get remote bytes: 0.21 s spark1.5.1 get remote bytes: 0.20 s was: The following code demonstrates the issue: def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName(s"BMTest") val size = 3344570 val sc = new SparkContext(conf) val data = sc.parallelize(1 to 100, 8) var accum = sc.accumulator(0.0, "get remote bytes") var i = 0 while(i < 91) { accum = sc.accumulator(0.0, "get remote bytes") val test = data.mapPartitionsWithIndex { (pid, iter) => val N = size val bm = SparkEnv.get.blockManager val blockId = TaskResultBlockId(10*i + pid) val test = new Array[Byte](N) Random.nextBytes(test) val buffer = ByteBuffer.allocate(N) buffer.limit(N) buffer.put(test) bm.putBytes(blockId, buffer, StorageLevel.MEMORY_ONLY_SER) Iterator(1) }.count() data.mapPartitionsWithIndex { (pid, iter) => val before = System.nanoTime() val bm = SparkEnv.get.blockManager (1 to 8).map(s => { Future { val result = bm.getRemoteBytes(TaskResultBlockId(10*i + s)) } }).map(Await.result(_, Duration.Inf)) accum.add((System.nanoTime() - before) / 1e9) Iterator(1) }.count() println("get remote bytes take: " + accum.value/8) i += 1 } } In spark1.6.2, average of "getting remote bytes" time is: 0.16s while in spark 1.5.1 average of "getting remote bytes" time is: 0.07s However if fetch block in single thread, the gap is much smaller. spark1.6.2 get remote bytes: 0.191421s spark1.5.1 get remote bytes: 0.181312s > BlockFetch with multiple threads slows down after spark 1.6 > --- > > Key: SPARK-17951 > URL: https://issues.apache.org/jira/browse/SPARK-17951 > Project: Spark > Issue Type: Bug > Components: Block Manager, Spark Core >Affects Versions: 1.6.2 > Environment: cluster with 8 node, each node has 28 cores. 10Gb network >Reporter: ding > > The following code demonstrates the issue: > def main(args: Array[String]): Unit = { > val conf = new SparkConf().setAppName(s"BMTest") > val size = 3344570 > val sc = new SparkContext(conf) > val data = sc.parallelize(1 to 100, 8) > var accum = sc.accumulator(0.0, "get remote bytes") > var i = 0 > while(i < 91) { > accum = sc.accumulator(0.0, "get remote bytes") > val test = data.mapPartitionsWithIndex { (pid, iter) => > val N = size > val bm = SparkEnv.get.blockManager > val blockId = TaskResultBlockId(10*i + pid) > val test = new Array[Byte](N) > Random.nextBytes(test) > val buffer = ByteBuffer.allocate(N) > buffer.limit(N) > buffer.put(test) > bm.putBytes(blockId, buffer, StorageLevel.MEMORY_ONLY_SER) > Iterator(1) > }.count() > > data.mapPartitionsWithIndex { (pid, iter) => > val before = System.nanoTime() > > val bm = SparkEnv.get.blockManager >