Re: [spark-streaming] can shuffle write to disk be disabled?
Thanks, Shao On Wed, Mar 18, 2015 at 3:34 PM, Shao, Saisai saisai.s...@intel.com wrote: Yeah, as I said your job processing time is much larger than the sliding window, and streaming job is executed one by one in sequence, so the next job will wait until the first job is finished, so the total latency will be accumulated. I think you need to identify the bottleneck of your job at first. If the shuffle is so slow, you could enlarge the shuffle fraction of memory to reduce the spill, but finally the shuffle data will be written to disk, this cannot be disabled, unless you mount your spark.tmp.dir on ramdisk. I have increased spark.shuffle.memoryFraction to 0.8 which I can see from SparKUI's environment variables But spill always happens even from start when latency is less than slide window(I changed it to 10 seconds), the shuflle data disk written is really a snow ball effect, it slows down eventually. I noticed that the files spilled to disk are all very small in size but huge in numbers: total 344K drwxr-xr-x 2 root root 4.0K Mar 18 16:55 . drwxr-xr-x 66 root root 4.0K Mar 18 16:39 .. -rw-r--r-- 1 root root 80K Mar 18 16:54 shuffle_47_519_0.data -rw-r--r-- 1 root root 75K Mar 18 16:54 shuffle_48_419_0.data -rw-r--r-- 1 root root 36K Mar 18 16:54 shuffle_48_518_0.data -rw-r--r-- 1 root root 69K Mar 18 16:55 shuffle_49_319_0.data -rw-r--r-- 1 root root 330 Mar 18 16:55 shuffle_49_418_0.data -rw-r--r-- 1 root root 65K Mar 18 16:55 shuffle_49_517_0.data MemStore says: 15/03/18 17:59:43 WARN MemoryStore: Failed to reserve initial memory threshold of 1024.0 KB for computing block rdd_1338_2 in memory. 15/03/18 17:59:43 WARN MemoryStore: Not enough space to cache rdd_1338_2 in memory! (computed 512.0 B so far) 15/03/18 17:59:43 INFO MemoryStore: Memory use = 529.0 MB (blocks) + 0.0 B (scratch space shared across 0 thread(s)) = 529.0 MB. Storage limit = 529.9 MB. Not enough space even for 512 byte?? The executors still has plenty free memory: 0slave1:40778 0 0.0 B / 529.9 MB 0.0 B 16 0 15047 15063 2.17 h 0.0 B 402.3 MB 768.0 B 1 slave2:50452 0 0.0 B / 529.9 MB 0.0 B 16 0 14447 14463 2.17 h 0.0 B 388.8 MB 1248.0 B 1 lvs02:47325116 27.6 MB / 529.9 MB 0.0 B 8 0 58169 58177 3.16 h 893.5 MB 624.0 B 1189.9 MB driver lvs02:47041 0 0.0 B / 529.9 MB 0.0 B 0 0 0 0 0 ms 0.0 B 0.0 B 0.0 B Besides if CPU or network is the bottleneck, you might need to add more resources to your cluster. 3 dedicated servers each with CPU 16 cores + 16GB memory and Gigabyte network. CPU load is quite low , about 1~3 from top, and network usage is far from saturated. I don't even do any usefull complex calculations in this small Simple App yet.
Re: [spark-streaming] can shuffle write to disk be disabled?
I think you can disable it with spark.shuffle.spill=false Thanks Best Regards On Wed, Mar 18, 2015 at 3:39 PM, Darren Hoo darren@gmail.com wrote: Thanks, Shao On Wed, Mar 18, 2015 at 3:34 PM, Shao, Saisai saisai.s...@intel.com wrote: Yeah, as I said your job processing time is much larger than the sliding window, and streaming job is executed one by one in sequence, so the next job will wait until the first job is finished, so the total latency will be accumulated. I think you need to identify the bottleneck of your job at first. If the shuffle is so slow, you could enlarge the shuffle fraction of memory to reduce the spill, but finally the shuffle data will be written to disk, this cannot be disabled, unless you mount your spark.tmp.dir on ramdisk. I have increased spark.shuffle.memoryFraction to 0.8 which I can see from SparKUI's environment variables But spill always happens even from start when latency is less than slide window(I changed it to 10 seconds), the shuflle data disk written is really a snow ball effect, it slows down eventually. I noticed that the files spilled to disk are all very small in size but huge in numbers: total 344K drwxr-xr-x 2 root root 4.0K Mar 18 16:55 . drwxr-xr-x 66 root root 4.0K Mar 18 16:39 .. -rw-r--r-- 1 root root 80K Mar 18 16:54 shuffle_47_519_0.data -rw-r--r-- 1 root root 75K Mar 18 16:54 shuffle_48_419_0.data -rw-r--r-- 1 root root 36K Mar 18 16:54 shuffle_48_518_0.data -rw-r--r-- 1 root root 69K Mar 18 16:55 shuffle_49_319_0.data -rw-r--r-- 1 root root 330 Mar 18 16:55 shuffle_49_418_0.data -rw-r--r-- 1 root root 65K Mar 18 16:55 shuffle_49_517_0.data MemStore says: 15/03/18 17:59:43 WARN MemoryStore: Failed to reserve initial memory threshold of 1024.0 KB for computing block rdd_1338_2 in memory. 15/03/18 17:59:43 WARN MemoryStore: Not enough space to cache rdd_1338_2 in memory! (computed 512.0 B so far) 15/03/18 17:59:43 INFO MemoryStore: Memory use = 529.0 MB (blocks) + 0.0 B (scratch space shared across 0 thread(s)) = 529.0 MB. Storage limit = 529.9 MB. Not enough space even for 512 byte?? The executors still has plenty free memory: 0slave1:40778 0 0.0 B / 529.9 MB 0.0 B 16 0 15047 15063 2.17 h 0.0 B 402.3 MB 768.0 B 1 slave2:50452 0 0.0 B / 529.9 MB 0.0 B 16 0 14447 14463 2.17 h 0.0 B 388.8 MB 1248.0 B 1 lvs02:47325116 27.6 MB / 529.9 MB 0.0 B 8 0 58169 58177 3.16 h 893.5 MB 624.0 B 1189.9 MB driver lvs02:47041 0 0.0 B / 529.9 MB 0.0 B 0 0 0 0 0 ms 0.0 B 0.0 B 0.0 B Besides if CPU or network is the bottleneck, you might need to add more resources to your cluster. 3 dedicated servers each with CPU 16 cores + 16GB memory and Gigabyte network. CPU load is quite low , about 1~3 from top, and network usage is far from saturated. I don't even do any usefull complex calculations in this small Simple App yet.
Re: [spark-streaming] can shuffle write to disk be disabled?
I've already done that: From SparkUI Environment Spark properties has: spark.shuffle.spillfalse On Wed, Mar 18, 2015 at 6:34 PM, Akhil Das ak...@sigmoidanalytics.com wrote: I think you can disable it with spark.shuffle.spill=false Thanks Best Regards On Wed, Mar 18, 2015 at 3:39 PM, Darren Hoo darren@gmail.com wrote: Thanks, Shao On Wed, Mar 18, 2015 at 3:34 PM, Shao, Saisai saisai.s...@intel.com wrote: Yeah, as I said your job processing time is much larger than the sliding window, and streaming job is executed one by one in sequence, so the next job will wait until the first job is finished, so the total latency will be accumulated. I think you need to identify the bottleneck of your job at first. If the shuffle is so slow, you could enlarge the shuffle fraction of memory to reduce the spill, but finally the shuffle data will be written to disk, this cannot be disabled, unless you mount your spark.tmp.dir on ramdisk. I have increased spark.shuffle.memoryFraction to 0.8 which I can see from SparKUI's environment variables But spill always happens even from start when latency is less than slide window(I changed it to 10 seconds), the shuflle data disk written is really a snow ball effect, it slows down eventually. I noticed that the files spilled to disk are all very small in size but huge in numbers: total 344K drwxr-xr-x 2 root root 4.0K Mar 18 16:55 . drwxr-xr-x 66 root root 4.0K Mar 18 16:39 .. -rw-r--r-- 1 root root 80K Mar 18 16:54 shuffle_47_519_0.data -rw-r--r-- 1 root root 75K Mar 18 16:54 shuffle_48_419_0.data -rw-r--r-- 1 root root 36K Mar 18 16:54 shuffle_48_518_0.data -rw-r--r-- 1 root root 69K Mar 18 16:55 shuffle_49_319_0.data -rw-r--r-- 1 root root 330 Mar 18 16:55 shuffle_49_418_0.data -rw-r--r-- 1 root root 65K Mar 18 16:55 shuffle_49_517_0.data MemStore says: 15/03/18 17:59:43 WARN MemoryStore: Failed to reserve initial memory threshold of 1024.0 KB for computing block rdd_1338_2 in memory. 15/03/18 17:59:43 WARN MemoryStore: Not enough space to cache rdd_1338_2 in memory! (computed 512.0 B so far) 15/03/18 17:59:43 INFO MemoryStore: Memory use = 529.0 MB (blocks) + 0.0 B (scratch space shared across 0 thread(s)) = 529.0 MB. Storage limit = 529.9 MB. Not enough space even for 512 byte?? The executors still has plenty free memory: 0slave1:40778 0 0.0 B / 529.9 MB 0.0 B 16 0 15047 15063 2.17 h 0.0 B 402.3 MB 768.0 B 1 slave2:50452 0 0.0 B / 529.9 MB 0.0 B 16 0 14447 14463 2.17 h 0.0 B 388.8 MB 1248.0 B 1 lvs02:47325116 27.6 MB / 529.9 MB 0.0 B 8 0 58169 58177 3.16 h 893.5 MB 624.0 B 1189.9 MB driver lvs02:47041 0 0.0 B / 529.9 MB 0.0 B 0 0 0 0 0 ms 0.0 B 0.0 B 0.0 B Besides if CPU or network is the bottleneck, you might need to add more resources to your cluster. 3 dedicated servers each with CPU 16 cores + 16GB memory and Gigabyte network. CPU load is quite low , about 1~3 from top, and network usage is far from saturated. I don't even do any usefull complex calculations in this small Simple App yet.
RE: [spark-streaming] can shuffle write to disk be disabled?
From the log you pasted I think this (-rw-r--r-- 1 root root 80K Mar 18 16:54 shuffle_47_519_0.data) is not shuffle spilled data, but the final shuffle result. As I said, did you think shuffle is the bottleneck which makes your job running slowly? Maybe you should identify the cause at first. Besides from the log it looks your memory is not enough the cache the data, maybe you should increase the memory size of the executor. Thanks Jerry From: Darren Hoo [mailto:darren@gmail.com] Sent: Wednesday, March 18, 2015 6:41 PM To: Akhil Das Cc: user@spark.apache.org Subject: Re: [spark-streaming] can shuffle write to disk be disabled? I've already done that: From SparkUI Environment Spark properties has: spark.shuffle.spill false On Wed, Mar 18, 2015 at 6:34 PM, Akhil Das ak...@sigmoidanalytics.commailto:ak...@sigmoidanalytics.com wrote: I think you can disable it with spark.shuffle.spill=false Thanks Best Regards On Wed, Mar 18, 2015 at 3:39 PM, Darren Hoo darren@gmail.commailto:darren@gmail.com wrote: Thanks, Shao On Wed, Mar 18, 2015 at 3:34 PM, Shao, Saisai saisai.s...@intel.commailto:saisai.s...@intel.com wrote: Yeah, as I said your job processing time is much larger than the sliding window, and streaming job is executed one by one in sequence, so the next job will wait until the first job is finished, so the total latency will be accumulated. I think you need to identify the bottleneck of your job at first. If the shuffle is so slow, you could enlarge the shuffle fraction of memory to reduce the spill, but finally the shuffle data will be written to disk, this cannot be disabled, unless you mount your spark.tmp.dir on ramdisk. I have increased spark.shuffle.memoryFraction to 0.8 which I can see from SparKUI's environment variables But spill always happens even from start when latency is less than slide window(I changed it to 10 seconds), the shuflle data disk written is really a snow ball effect, it slows down eventually. I noticed that the files spilled to disk are all very small in size but huge in numbers: total 344K drwxr-xr-x 2 root root 4.0K Mar 18 16:55 . drwxr-xr-x 66 root root 4.0K Mar 18 16:39 .. -rw-r--r-- 1 root root 80K Mar 18 16:54 shuffle_47_519_0.data -rw-r--r-- 1 root root 75K Mar 18 16:54 shuffle_48_419_0.data -rw-r--r-- 1 root root 36K Mar 18 16:54 shuffle_48_518_0.data -rw-r--r-- 1 root root 69K Mar 18 16:55 shuffle_49_319_0.data -rw-r--r-- 1 root root 330 Mar 18 16:55 shuffle_49_418_0.data -rw-r--r-- 1 root root 65K Mar 18 16:55 shuffle_49_517_0.data MemStore says: 15/03/18 17:59:43 WARN MemoryStore: Failed to reserve initial memory threshold of 1024.0 KB for computing block rdd_1338_2 in memory. 15/03/18 17:59:43 WARN MemoryStore: Not enough space to cache rdd_1338_2 in memory! (computed 512.0 B so far) 15/03/18 17:59:43 INFO MemoryStore: Memory use = 529.0 MB (blocks) + 0.0 B (scratch space shared across 0 thread(s)) = 529.0 MB. Storage limit = 529.9 MB. Not enough space even for 512 byte?? The executors still has plenty free memory: 0 slave1:40778 0 0.0 B / 529.9 MB 0.0 B 16 0 15047 15063 2.17 h 0.0 B 402.3 MB 768.0 B 1 slave2:50452 0 0.0 B / 529.9 MB 0.0 B 16 0 14447 14463 2.17 h 0.0 B 388.8 MB 1248.0 B 1 lvs02:47325 116 27.6 MB / 529.9 MB 0.0 B 8 0 58169 58177 3.16 h 893.5 MB 624.0 B 1189.9 MB driver lvs02:47041 0 0.0 B / 529.9 MB 0.0 B 0 0 0 0 0 ms 0.0 B 0.0 B 0.0 B Besides if CPU or network is the bottleneck, you might need to add more resources to your cluster. 3 dedicated servers each with CPU 16 cores + 16GB memory and Gigabyte network. CPU load is quite low , about 1~3 from top, and network usage is far from saturated. I don't even do any usefull complex calculations in this small Simple App yet.
Re: [spark-streaming] can shuffle write to disk be disabled?
On Wed, Mar 18, 2015 at 8:31 PM, Shao, Saisai saisai.s...@intel.com wrote: From the log you pasted I think this (-rw-r--r-- 1 root root 80K Mar 18 16:54 shuffle_47_519_0.data) is not shuffle spilled data, but the final shuffle result. why the shuffle result is written to disk? As I said, did you think shuffle is the bottleneck which makes your job running slowly? I am quite new to spark, So I am just doing wild guesses. which information should I provide further that can help to find the real bottleneck? Maybe you should identify the cause at first. Besides from the log it looks your memory is not enough the cache the data, maybe you should increase the memory size of the executor. running two executors, the memory ussage is quite low: executor 0 8.6 MB / 4.1 GB executor 1 23.9 MB / 4.1 GB driver 0.0B / 529.9 MB submitted with args : --executor-memory 8G --num-executors 2 --driver-memory 1G
Re: [spark-streaming] can shuffle write to disk be disabled?
: Finished task 179.0 in stage 392.0 (TID 100613) in 292 ms on lvs02 (181/291) 15/03/18 15:16:41 INFO TaskSetManager: Starting task 188.0 in stage 392.0 (TID 100622, lvs02, PROCESS_LOCAL, 1122 bytes) 15/03/18 15:16:41 INFO TaskSetManager: Finished task 182.0 in stage 392.0 (TID 100616) in 213 ms on lvs02 (182/291) 15/03/18 15:16:41 INFO BlockManagerInfo: Added input-0-1426663001400 in memory on lvs02:38954 (size: 24.4 KB, free: 1068.1 MB) 15/03/18 15:16:41 INFO TaskSetManager: Starting task 189.0 in stage 392.0 (TID 100623, lvs02, PROCESS_LOCAL, 1122 bytes) 15/03/18 15:16:41 INFO TaskSetManager: Finished task 181.0 in stage 392.0 (TID 100615) in 286 ms on lvs02 (183/291) 15/03/18 15:16:41 INFO TaskSetManager: Starting task 190.0 in stage 392.0 (TID 100624, lvs02, PROCESS_LOCAL, 1122 bytes) 15/03/18 15:16:41 INFO TaskSetManager: Finished task 183.0 in stage 392.0 (TID 100617) in 261 ms on lvs02 (184/291) Any hints? Thanks! On Wed, Mar 18, 2015 at 2:19 PM, Shao, Saisai saisai.s...@intel.com wrote: Would you please check your driver log or streaming web UI to see each job's latency, including processing latency and total latency. Seems from your code, sliding window is just 3 seconds, so you will process each 60 second's data in 3 seconds, if processing latency is larger than the sliding window, so maybe you computation power cannot reach to the qps you wanted. I think you need to identify the bottleneck at first, and then trying to tune your code, balance the data, add more computation resources. Thanks Jerry *From:* Darren Hoo [mailto:darren@gmail.com] *Sent:* Wednesday, March 18, 2015 1:39 PM *To:* user@spark.apache.org *Subject:* [spark-streaming] can shuffle write to disk be disabled? I use spark-streaming reading messages from a Kafka, the producer creates messages about 1500 per second def hash(x: String): Int = { MurmurHash3.stringHash(x) } val stream = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap, StorageLevel.MEMORY_ONLY_SER).map(_._2) val clickstream = stream.map(log = { //parse log ... (hash(log.url), HashSet(hash(log.userid))) }).window(Seconds(60), Seconds(3)) val upv = clickstream.transform( rdd = rdd.reduceByKey(_ ++ _ ).map{ case(url, visits) = { val uv = visits.size (uv, url) }}) upv.foreach(rdd = println(new Date() + \n---\n + rdd.top(20).mkString(\n) + \n)) it is quite quick upon startup, but after running for a few minutes, it goes slower and slower and the latency can be minutes. I found a lot of shuffle writes at /tmp/spark- in several gigabytes. with 1500 qps of message and window size of 60 seconds, I think it should be done within memory without writing to disk at all I've set executor-memory to 8G, So there is plenty of memory. $SPARK_HOME/bin/spark-submit \ --class SimpleApp \ --master spark://localhost:7077 \ --driver-memory 16G \ --executor-memory 8G \ target/scala-2.10/simple-assembly-1.0.jar I also tries these settings, but it still spill to disk. spark.master spark://localhost:7077 #spark.driver.memory 4g #spark.shuffle.file.buffer.kb 4096 #spark.shuffle.memoryFraction 0.8 #spark.storage.unrollFraction 0.8 #spark.storage.unrollMemoryThreshold 1073741824 spark.io.compression.codec lz4 spark.shuffle.spill false spark.serializer org.apache.spark.serializer.KryoSerializer where am I wrong?
RE: [spark-streaming] can shuffle write to disk be disabled?
Yeah, as I said your job processing time is much larger than the sliding window, and streaming job is executed one by one in sequence, so the next job will wait until the first job is finished, so the total latency will be accumulated. I think you need to identify the bottleneck of your job at first. If the shuffle is so slow, you could enlarge the shuffle fraction of memory to reduce the spill, but finally the shuffle data will be written to disk, this cannot be disabled, unless you mount your spark.tmp.dir on ramdisk. Besides if CPU or network is the bottleneck, you might need to add more resources to your cluster. Thanks Jerry From: Darren Hoo [mailto:darren@gmail.com] Sent: Wednesday, March 18, 2015 3:24 PM To: Shao, Saisai Cc: user@spark.apache.org Subject: Re: [spark-streaming] can shuffle write to disk be disabled? Hi, Saisai Here is the duration of one of the jobs, 22 seconds in total, it is longer than the sliding window. Stage Id Description Submitted Duration Tasks: Succeeded/Total Input Output Shuffle Read Shuffle Write 342 foreach at SimpleApp.scala:58 2015/03/18 15:06:58 16 s 288/28810.6 MB 341 window at SimpleApp.scala:512015/03/18 15:06:52 6s 288/288 12.3 MB 10.6 MB And part of the driver log: 15/03/18 15:16:36 INFO DStreamGraph: Cleared checkpoint data for time 1426662996000 ms 15/03/18 15:16:36 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer(1426662932000 ms) 15/03/18 15:16:36 INFO TaskSetManager: Starting task 81.0 in stage 392.0 (TID 100515, lvs02, PROCESS_LOCAL, 1122 bytes) 15/03/18 15:16:36 INFO TaskSetManager: Finished task 75.0 in stage 392.0 (TID 100509) in 370 ms on lvs02 (75/291) 15/03/18 15:16:36 INFO TaskSetManager: Starting task 82.0 in stage 392.0 (TID 100516, lvs02, PROCESS_LOCAL, 1122 bytes) 15/03/18 15:16:36 INFO TaskSetManager: Finished task 77.0 in stage 392.0 (TID 100511) in 261 ms on lvs02 (76/291) 15/03/18 15:16:36 INFO TaskSetManager: Starting task 83.0 in stage 392.0 (TID 100517, lvs02, PROCESS_LOCAL, 1122 bytes) 15/03/18 15:16:36 INFO TaskSetManager: Finished task 78.0 in stage 392.0 (TID 100512) in 274 ms on lvs02 (77/291) 15/03/18 15:16:36 INFO TaskSetManager: Starting task 84.0 in stage 392.0 (TID 100518, lvs02, PROCESS_LOCAL, 1122 bytes) 15/03/18 15:16:36 INFO TaskSetManager: Finished task 74.0 in stage 392.0 (TID 100508) in 569 ms on lvs02 (78/291) 15/03/18 15:16:36 INFO BlockManagerInfo: Added input-0-1426662996000 in memory on lvs02:38954 (size: 398.3 KB, free: 1073.7 MB) 15/03/18 15:16:36 INFO TaskSetManager: Starting task 85.0 in stage 392.0 (TID 100519, lvs02, PROCESS_LOCAL, 1122 bytes) 15/03/18 15:16:36 INFO TaskSetManager: Finished task 76.0 in stage 392.0 (TID 100510) in 539 ms on lvs02 (79/291) 15/03/18 15:16:36 INFO TaskSetManager: Starting task 86.0 in stage 392.0 (TID 100520, lvs02, PROCESS_LOCAL, 1122 bytes) 15/03/18 15:16:36 INFO TaskSetManager: Finished task 80.0 in stage 392.0 (TID 100514) in 296 ms on lvs02 (80/291) 15/03/18 15:16:36 INFO TaskSetManager: Starting task 87.0 in stage 392.0 (TID 100521, lvs02, PROCESS_LOCAL, 1122 bytes) 15/03/18 15:16:36 INFO TaskSetManager: Finished task 81.0 in stage 392.0 (TID 100515) in 292 ms on lvs02 (81/291) 15/03/18 15:16:36 INFO TaskSetManager: Starting task 88.0 in stage 392.0 (TID 100522, lvs02, PROCESS_LOCAL, 1122 bytes) 15/03/18 15:16:36 INFO TaskSetManager: Finished task 82.0 in stage 392.0 (TID 100516) in 331 ms on lvs02 (82/291) 15/03/18 15:16:36 INFO TaskSetManager: Starting task 89.0 in stage 392.0 (TID 100523, lvs02, PROCESS_LOCAL, 1122 bytes) 15/03/18 15:16:36 INFO TaskSetManager: Finished task 83.0 in stage 392.0 (TID 100517) in 271 ms on lvs02 (83/291) 15/03/18 15:16:36 INFO BlockManagerInfo: Added input-0-1426662996200 in memory on lvs02:38954 (size: 31.0 KB, free: 1073.7 MB) 15/03/18 15:16:36 INFO TaskSetManager: Starting task 90.0 in stage 392.0 (TID 100524, lvs02, PROCESS_LOCAL, 1122 bytes) 15/03/18 15:16:36 INFO TaskSetManager: Finished task 79.0 in stage 392.0 (TID 100513) in 549 ms on lvs02 (84/291) 15/03/18 15:16:36 INFO TaskSetManager: Starting task 91.0 in stage 392.0 (TID 100525, lvs02, PROCESS_LOCAL, 1122 bytes) 15/03/18 15:16:36 INFO TaskSetManager: Finished task 84.0 in stage 392.0 (TID 100518) in 327 ms on lvs02 (85/291) 15/03/18 15:16:36 INFO TaskSetManager: Starting task 92.0 in stage 392.0 (TID 100526, lvs02, PROCESS_LOCAL, 1122 bytes) 15/03/18 15:16:36 INFO TaskSetManager: Finished task 86.0 in stage 392.0 (TID 100520) in 293 ms on lvs02 (86/291) 15/03/18 15:16:36 INFO TaskSetManager: Starting task 93.0 in stage 392.0 (TID 100527, lvs02, PROCESS_LOCAL, 1122 bytes) 15/03/18 15:16:36 INFO TaskSetManager: Finished task 87.0 in stage 392.0 (TID 100521) in 257 ms on lvs02 (87/291) 15/03/18 15:16:36 INFO TaskSetManager
RE: [spark-streaming] can shuffle write to disk be disabled?
Please see the inline comments. Thanks Jerry From: Darren Hoo [mailto:darren@gmail.com] Sent: Wednesday, March 18, 2015 9:30 PM To: Shao, Saisai Cc: user@spark.apache.org; Akhil Das Subject: Re: [spark-streaming] can shuffle write to disk be disabled? On Wed, Mar 18, 2015 at 8:31 PM, Shao, Saisai saisai.s...@intel.commailto:saisai.s...@intel.com wrote: From the log you pasted I think this (-rw-r--r-- 1 root root 80K Mar 18 16:54 shuffle_47_519_0.data) is not shuffle spilled data, but the final shuffle result. why the shuffle result is written to disk? This is the internal mechanism for Spark. As I said, did you think shuffle is the bottleneck which makes your job running slowly? I am quite new to spark, So I am just doing wild guesses. which information should I provide further that can help to find the real bottleneck? You can monitor the system metrics, as well as JVM, also information from web UI is very useful. Maybe you should identify the cause at first. Besides from the log it looks your memory is not enough the cache the data, maybe you should increase the memory size of the executor. running two executors, the memory ussage is quite low: executor 0 8.6 MB / 4.1 GB executor 1 23.9 MB / 4.1 GB driver 0.0B / 529.9 MB submitted with args : --executor-memory 8G --num-executors 2 --driver-memory 1G
RE: [spark-streaming] can shuffle write to disk be disabled?
Would you please check your driver log or streaming web UI to see each job's latency, including processing latency and total latency. Seems from your code, sliding window is just 3 seconds, so you will process each 60 second's data in 3 seconds, if processing latency is larger than the sliding window, so maybe you computation power cannot reach to the qps you wanted. I think you need to identify the bottleneck at first, and then trying to tune your code, balance the data, add more computation resources. Thanks Jerry From: Darren Hoo [mailto:darren@gmail.com] Sent: Wednesday, March 18, 2015 1:39 PM To: user@spark.apache.org Subject: [spark-streaming] can shuffle write to disk be disabled? I use spark-streaming reading messages from a Kafka, the producer creates messages about 1500 per second def hash(x: String): Int = { MurmurHash3.stringHash(x) } val stream = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap, StorageLevel.MEMORY_ONLY_SER).map(_._2) val clickstream = stream.map(log = { //parse log ... (hash(log.url), HashSet(hash(log.userid))) }).window(Seconds(60), Seconds(3)) val upv = clickstream.transform( rdd = rdd.reduceByKey(_ ++ _ ).map{ case(url, visits) = { val uv = visits.size (uv, url) }}) upv.foreach(rdd = println(new Date() + \n---\n + rdd.top(20).mkString(\n) + \n)) it is quite quick upon startup, but after running for a few minutes, it goes slower and slower and the latency can be minutes. I found a lot of shuffle writes at /tmp/spark- in several gigabytes. with 1500 qps of message and window size of 60 seconds, I think it should be done within memory without writing to disk at all I've set executor-memory to 8G, So there is plenty of memory. $SPARK_HOME/bin/spark-submit \ --class SimpleApp \ --master spark://localhost:7077 \ --driver-memory 16G \ --executor-memory 8G \ target/scala-2.10/simple-assembly-1.0.jar I also tries these settings, but it still spill to disk. spark.master spark://localhost:7077 #spark.driver.memory 4g #spark.shuffle.file.buffer.kb 4096 #spark.shuffle.memoryFraction 0.8 #spark.storage.unrollFraction 0.8 #spark.storage.unrollMemoryThreshold 1073741824 spark.io.compression.codec lz4 spark.shuffle.spill false spark.serializer org.apache.spark.serializer.KryoSerializer where am I wrong?