Re: [spark-streaming] can shuffle write to disk be disabled?

2015-03-18 Thread Darren Hoo
Thanks, Shao

On Wed, Mar 18, 2015 at 3:34 PM, Shao, Saisai saisai.s...@intel.com wrote:

  Yeah, as I said your job processing time is much larger than the sliding
 window, and streaming job is executed one by one in sequence, so the next
 job will wait until the first job is finished, so the total latency will be
 accumulated.



 I think you need to identify the bottleneck of your job at first. If the
 shuffle is so slow, you could enlarge the shuffle fraction of memory to
 reduce the spill, but finally the shuffle data will be written to disk,
 this cannot be disabled, unless you mount your spark.tmp.dir on ramdisk.



I have increased spark.shuffle.memoryFraction  to  0.8  which I can see
from SparKUI's environment variables

But spill  always happens even from start when latency is less than slide
window(I changed it to 10 seconds),
the shuflle data disk written is really a snow ball effect,  it slows down
eventually.

I noticed that the files spilled to disk are all very small in size but
huge in numbers:

total 344K

drwxr-xr-x  2 root root 4.0K Mar 18 16:55 .

drwxr-xr-x 66 root root 4.0K Mar 18 16:39 ..

-rw-r--r--  1 root root  80K Mar 18 16:54 shuffle_47_519_0.data

-rw-r--r--  1 root root  75K Mar 18 16:54 shuffle_48_419_0.data

-rw-r--r--  1 root root  36K Mar 18 16:54 shuffle_48_518_0.data

-rw-r--r--  1 root root  69K Mar 18 16:55 shuffle_49_319_0.data

-rw-r--r--  1 root root  330 Mar 18 16:55 shuffle_49_418_0.data

-rw-r--r--  1 root root  65K Mar 18 16:55 shuffle_49_517_0.data

MemStore says:

15/03/18 17:59:43 WARN MemoryStore: Failed to reserve initial memory
threshold of 1024.0 KB for computing block rdd_1338_2 in memory.
15/03/18 17:59:43 WARN MemoryStore: Not enough space to cache
rdd_1338_2 in memory! (computed 512.0 B so far)
15/03/18 17:59:43 INFO MemoryStore: Memory use = 529.0 MB (blocks) +
0.0 B (scratch space shared across 0 thread(s)) = 529.0 MB. Storage
limit = 529.9 MB.

Not enough space even for 512 byte??


The executors still has plenty free memory:
0slave1:40778 0   0.0 B / 529.9 MB  0.0 B 16 0 15047 15063 2.17
h  0.0 B  402.3 MB  768.0 B
1 slave2:50452 0 0.0 B / 529.9 MB  0.0 B 16 0 14447 14463 2.17 h  0.0 B
388.8 MB  1248.0 B

1 lvs02:47325116 27.6 MB / 529.9 MB  0.0 B 8 0 58169 58177 3.16
h  893.5 MB  624.0 B  1189.9 MB

driver lvs02:47041 0 0.0 B / 529.9 MB  0.0 B 0 0 0 0 0 ms  0.0 B  0.0
B  0.0 B


Besides if CPU or network is the bottleneck, you might need to add more
 resources to your cluster.



 3 dedicated servers each with CPU 16 cores + 16GB memory and Gigabyte
network.
 CPU load is quite low , about 1~3 from top,  and network usage  is far
from saturated.

 I don't even  do any usefull complex calculations in this small Simple App
yet.


Re: [spark-streaming] can shuffle write to disk be disabled?

2015-03-18 Thread Akhil Das
I think you can disable it with spark.shuffle.spill=false

Thanks
Best Regards

On Wed, Mar 18, 2015 at 3:39 PM, Darren Hoo darren@gmail.com wrote:

 Thanks, Shao

 On Wed, Mar 18, 2015 at 3:34 PM, Shao, Saisai saisai.s...@intel.com
 wrote:

  Yeah, as I said your job processing time is much larger than the
 sliding window, and streaming job is executed one by one in sequence, so
 the next job will wait until the first job is finished, so the total
 latency will be accumulated.



 I think you need to identify the bottleneck of your job at first. If the
 shuffle is so slow, you could enlarge the shuffle fraction of memory to
 reduce the spill, but finally the shuffle data will be written to disk,
 this cannot be disabled, unless you mount your spark.tmp.dir on ramdisk.



 I have increased spark.shuffle.memoryFraction  to  0.8  which I can see
 from SparKUI's environment variables

 But spill  always happens even from start when latency is less than slide
 window(I changed it to 10 seconds),
 the shuflle data disk written is really a snow ball effect,  it slows down
 eventually.

 I noticed that the files spilled to disk are all very small in size but
 huge in numbers:

 total 344K

 drwxr-xr-x  2 root root 4.0K Mar 18 16:55 .

 drwxr-xr-x 66 root root 4.0K Mar 18 16:39 ..

 -rw-r--r--  1 root root  80K Mar 18 16:54 shuffle_47_519_0.data

 -rw-r--r--  1 root root  75K Mar 18 16:54 shuffle_48_419_0.data

 -rw-r--r--  1 root root  36K Mar 18 16:54 shuffle_48_518_0.data

 -rw-r--r--  1 root root  69K Mar 18 16:55 shuffle_49_319_0.data

 -rw-r--r--  1 root root  330 Mar 18 16:55 shuffle_49_418_0.data

 -rw-r--r--  1 root root  65K Mar 18 16:55 shuffle_49_517_0.data

 MemStore says:

 15/03/18 17:59:43 WARN MemoryStore: Failed to reserve initial memory 
 threshold of 1024.0 KB for computing block rdd_1338_2 in memory.
 15/03/18 17:59:43 WARN MemoryStore: Not enough space to cache rdd_1338_2 in 
 memory! (computed 512.0 B so far)
 15/03/18 17:59:43 INFO MemoryStore: Memory use = 529.0 MB (blocks) + 0.0 B 
 (scratch space shared across 0 thread(s)) = 529.0 MB. Storage limit = 529.9 
 MB.

 Not enough space even for 512 byte??


 The executors still has plenty free memory:
 0slave1:40778 0   0.0 B / 529.9 MB  0.0 B 16 0 15047 15063 2.17
 h  0.0 B  402.3 MB  768.0 B
 1 slave2:50452 0 0.0 B / 529.9 MB  0.0 B 16 0 14447 14463 2.17 h  0.0 B
 388.8 MB  1248.0 B

 1 lvs02:47325116 27.6 MB / 529.9 MB  0.0 B 8 0 58169 58177 3.16
 h  893.5 MB  624.0 B  1189.9 MB

 driver lvs02:47041 0 0.0 B / 529.9 MB  0.0 B 0 0 0 0 0 ms  0.0 B
 0.0 B  0.0 B


 Besides if CPU or network is the bottleneck, you might need to add more
 resources to your cluster.



  3 dedicated servers each with CPU 16 cores + 16GB memory and Gigabyte
 network.
  CPU load is quite low , about 1~3 from top,  and network usage  is far
 from saturated.

  I don't even  do any usefull complex calculations in this small Simple
 App yet.





Re: [spark-streaming] can shuffle write to disk be disabled?

2015-03-18 Thread Darren Hoo
I've already done that:

From SparkUI Environment  Spark properties has:

spark.shuffle.spillfalse

On Wed, Mar 18, 2015 at 6:34 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 I think you can disable it with spark.shuffle.spill=false

 Thanks
 Best Regards

 On Wed, Mar 18, 2015 at 3:39 PM, Darren Hoo darren@gmail.com wrote:

 Thanks, Shao

 On Wed, Mar 18, 2015 at 3:34 PM, Shao, Saisai saisai.s...@intel.com
 wrote:

  Yeah, as I said your job processing time is much larger than the
 sliding window, and streaming job is executed one by one in sequence, so
 the next job will wait until the first job is finished, so the total
 latency will be accumulated.



 I think you need to identify the bottleneck of your job at first. If the
 shuffle is so slow, you could enlarge the shuffle fraction of memory to
 reduce the spill, but finally the shuffle data will be written to disk,
 this cannot be disabled, unless you mount your spark.tmp.dir on ramdisk.



 I have increased spark.shuffle.memoryFraction  to  0.8  which I can see
 from SparKUI's environment variables

 But spill  always happens even from start when latency is less than slide
 window(I changed it to 10 seconds),
 the shuflle data disk written is really a snow ball effect,  it slows
 down eventually.

 I noticed that the files spilled to disk are all very small in size but
 huge in numbers:

 total 344K

 drwxr-xr-x  2 root root 4.0K Mar 18 16:55 .

 drwxr-xr-x 66 root root 4.0K Mar 18 16:39 ..

 -rw-r--r--  1 root root  80K Mar 18 16:54 shuffle_47_519_0.data

 -rw-r--r--  1 root root  75K Mar 18 16:54 shuffle_48_419_0.data

 -rw-r--r--  1 root root  36K Mar 18 16:54 shuffle_48_518_0.data

 -rw-r--r--  1 root root  69K Mar 18 16:55 shuffle_49_319_0.data

 -rw-r--r--  1 root root  330 Mar 18 16:55 shuffle_49_418_0.data

 -rw-r--r--  1 root root  65K Mar 18 16:55 shuffle_49_517_0.data

 MemStore says:

 15/03/18 17:59:43 WARN MemoryStore: Failed to reserve initial memory 
 threshold of 1024.0 KB for computing block rdd_1338_2 in memory.
 15/03/18 17:59:43 WARN MemoryStore: Not enough space to cache rdd_1338_2 in 
 memory! (computed 512.0 B so far)
 15/03/18 17:59:43 INFO MemoryStore: Memory use = 529.0 MB (blocks) + 0.0 B 
 (scratch space shared across 0 thread(s)) = 529.0 MB. Storage limit = 529.9 
 MB.

 Not enough space even for 512 byte??


 The executors still has plenty free memory:
 0slave1:40778 0   0.0 B / 529.9 MB  0.0 B 16 0 15047 15063 2.17
 h  0.0 B  402.3 MB  768.0 B
 1 slave2:50452 0 0.0 B / 529.9 MB  0.0 B 16 0 14447 14463 2.17 h  0.0 B
 388.8 MB  1248.0 B

 1 lvs02:47325116 27.6 MB / 529.9 MB  0.0 B 8 0 58169 58177 3.16
 h  893.5 MB  624.0 B  1189.9 MB

 driver lvs02:47041 0 0.0 B / 529.9 MB  0.0 B 0 0 0 0 0 ms  0.0 B
 0.0 B  0.0 B


 Besides if CPU or network is the bottleneck, you might need to add more
 resources to your cluster.



  3 dedicated servers each with CPU 16 cores + 16GB memory and Gigabyte
 network.
  CPU load is quite low , about 1~3 from top,  and network usage  is far
 from saturated.

  I don't even  do any usefull complex calculations in this small Simple
 App yet.






RE: [spark-streaming] can shuffle write to disk be disabled?

2015-03-18 Thread Shao, Saisai
From the log you pasted I think this (-rw-r--r--  1 root root  80K Mar 18 
16:54 shuffle_47_519_0.data) is not shuffle spilled data, but the final 
shuffle result. As I said, did you think shuffle is the bottleneck which makes 
your job running slowly? Maybe you should identify the cause at first. Besides 
from the log it looks your memory is not enough the cache the data, maybe you 
should increase the memory size of the executor.



Thanks

Jerry

From: Darren Hoo [mailto:darren@gmail.com]
Sent: Wednesday, March 18, 2015 6:41 PM
To: Akhil Das
Cc: user@spark.apache.org
Subject: Re: [spark-streaming] can shuffle write to disk be disabled?

I've already done that:

From SparkUI Environment  Spark properties has:

spark.shuffle.spill

false



On Wed, Mar 18, 2015 at 6:34 PM, Akhil Das 
ak...@sigmoidanalytics.commailto:ak...@sigmoidanalytics.com wrote:
I think you can disable it with spark.shuffle.spill=false

Thanks
Best Regards

On Wed, Mar 18, 2015 at 3:39 PM, Darren Hoo 
darren@gmail.commailto:darren@gmail.com wrote:
Thanks, Shao

On Wed, Mar 18, 2015 at 3:34 PM, Shao, Saisai 
saisai.s...@intel.commailto:saisai.s...@intel.com wrote:
Yeah, as I said your job processing time is much larger than the sliding 
window, and streaming job is executed one by one in sequence, so the next job 
will wait until the first job is finished, so the total latency will be 
accumulated.

I think you need to identify the bottleneck of your job at first. If the 
shuffle is so slow, you could enlarge the shuffle fraction of memory to reduce 
the spill, but finally the shuffle data will be written to disk, this cannot be 
disabled, unless you mount your spark.tmp.dir on ramdisk.


I have increased spark.shuffle.memoryFraction  to  0.8  which I can see from 
SparKUI's environment variables

But spill  always happens even from start when latency is less than slide 
window(I changed it to 10 seconds),
the shuflle data disk written is really a snow ball effect,  it slows down 
eventually.

I noticed that the files spilled to disk are all very small in size but huge in 
numbers:


total 344K

drwxr-xr-x  2 root root 4.0K Mar 18 16:55 .

drwxr-xr-x 66 root root 4.0K Mar 18 16:39 ..

-rw-r--r--  1 root root  80K Mar 18 16:54 shuffle_47_519_0.data

-rw-r--r--  1 root root  75K Mar 18 16:54 shuffle_48_419_0.data

-rw-r--r--  1 root root  36K Mar 18 16:54 shuffle_48_518_0.data

-rw-r--r--  1 root root  69K Mar 18 16:55 shuffle_49_319_0.data

-rw-r--r--  1 root root  330 Mar 18 16:55 shuffle_49_418_0.data

-rw-r--r--  1 root root  65K Mar 18 16:55 shuffle_49_517_0.data

MemStore says:


15/03/18 17:59:43 WARN MemoryStore: Failed to reserve initial memory threshold 
of 1024.0 KB for computing block rdd_1338_2 in memory.

15/03/18 17:59:43 WARN MemoryStore: Not enough space to cache rdd_1338_2 in 
memory! (computed 512.0 B so far)

15/03/18 17:59:43 INFO MemoryStore: Memory use = 529.0 MB (blocks) + 0.0 B 
(scratch space shared across 0 thread(s)) = 529.0 MB. Storage limit = 529.9 MB.
Not enough space even for 512 byte??


The executors still has plenty free memory:
0

   slave1:40778

0

  0.0 B / 529.9 MB

0.0 B

16

0

15047

15063

2.17 h

0.0 B

402.3 MB

768.0 B

1

slave2:50452

0

0.0 B / 529.9 MB

0.0 B

16

0

14447

14463

2.17 h

0.0 B

388.8 MB

1248.0 B



1

lvs02:47325

   116

27.6 MB / 529.9 MB

0.0 B

8

0

58169

58177

3.16 h

893.5 MB

624.0 B

1189.9 MB



driver

lvs02:47041

0

0.0 B / 529.9 MB

0.0 B

0

0

0

0

0 ms

0.0 B

0.0 B

0.0 B



Besides if CPU or network is the bottleneck, you might need to add more 
resources to your cluster.

 3 dedicated servers each with CPU 16 cores + 16GB memory and Gigabyte network.
 CPU load is quite low , about 1~3 from top,  and network usage  is far from 
saturated.

 I don't even  do any usefull complex calculations in this small Simple App yet.






Re: [spark-streaming] can shuffle write to disk be disabled?

2015-03-18 Thread Darren Hoo
On Wed, Mar 18, 2015 at 8:31 PM, Shao, Saisai saisai.s...@intel.com wrote:

  From the log you pasted I think this (-rw-r--r--  1 root root  80K Mar
 18 16:54 shuffle_47_519_0.data) is not shuffle spilled data, but the
 final shuffle result.


why the shuffle result  is written to disk?


 As I said, did you think shuffle is the bottleneck which makes your job
 running slowly?


I am quite new to spark, So I am just doing wild guesses. which information
should I provide further that
can help to find the real bottleneck?

Maybe you should identify the cause at first. Besides from the log it looks
 your memory is not enough the cache the data, maybe you should increase the
 memory size of the executor.




 running two executors, the memory ussage is quite low:

executor 0  8.6 MB / 4.1 GB
executor 1  23.9 MB / 4.1 GB
driver 0.0B / 529.9 MB

submitted with args : --executor-memory 8G  --num-executors 2
--driver-memory 1G


Re: [spark-streaming] can shuffle write to disk be disabled?

2015-03-18 Thread Darren Hoo
: Finished task 179.0 in stage 392.0
(TID 100613) in 292 ms on lvs02 (181/291)

15/03/18 15:16:41 INFO TaskSetManager: Starting task 188.0 in stage 392.0
(TID 100622, lvs02, PROCESS_LOCAL, 1122 bytes)

15/03/18 15:16:41 INFO TaskSetManager: Finished task 182.0 in stage 392.0
(TID 100616) in 213 ms on lvs02 (182/291)

15/03/18 15:16:41 INFO BlockManagerInfo: Added input-0-1426663001400 in
memory on lvs02:38954 (size: 24.4 KB, free: 1068.1 MB)

15/03/18 15:16:41 INFO TaskSetManager: Starting task 189.0 in stage 392.0
(TID 100623, lvs02, PROCESS_LOCAL, 1122 bytes)

15/03/18 15:16:41 INFO TaskSetManager: Finished task 181.0 in stage 392.0
(TID 100615) in 286 ms on lvs02 (183/291)

15/03/18 15:16:41 INFO TaskSetManager: Starting task 190.0 in stage 392.0
(TID 100624, lvs02, PROCESS_LOCAL, 1122 bytes)

15/03/18 15:16:41 INFO TaskSetManager: Finished task 183.0 in stage 392.0
(TID 100617) in 261 ms on lvs02 (184/291)


Any hints?


Thanks!

On Wed, Mar 18, 2015 at 2:19 PM, Shao, Saisai saisai.s...@intel.com wrote:

  Would you please check your driver log or streaming web UI to see each
 job's latency, including processing latency and total latency.



 Seems from your code, sliding window is just 3 seconds, so you will
 process each 60 second's data in 3 seconds, if processing latency is larger
 than the sliding window, so maybe you computation power cannot reach to the
 qps you wanted.



 I think you need to identify the bottleneck at first, and then trying to
 tune your code, balance the data, add more computation resources.



 Thanks

 Jerry



 *From:* Darren Hoo [mailto:darren@gmail.com]
 *Sent:* Wednesday, March 18, 2015 1:39 PM
 *To:* user@spark.apache.org
 *Subject:* [spark-streaming] can shuffle write to disk be disabled?



 I use spark-streaming reading  messages from a Kafka,  the producer
 creates messages about 1500 per second



  def hash(x: String): Int = {

 MurmurHash3.stringHash(x)

  }



  val stream = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap,
 StorageLevel.MEMORY_ONLY_SER).map(_._2)



  val clickstream = stream.map(log = {

//parse log

...

   (hash(log.url), HashSet(hash(log.userid)))

 }).window(Seconds(60), Seconds(3))



 val upv = clickstream.transform( rdd = rdd.reduceByKey(_ ++ _ ).map{
 case(url, visits) = {

  val uv = visits.size

  (uv, url)

 }})



 upv.foreach(rdd = println(new Date() +
 \n---\n + rdd.top(20).mkString(\n)
 + \n))



 it is quite quick upon startup, but after running for a few minutes, it
 goes slower and slower and the latency can be minutes.



 I found a lot of shuffle writes at /tmp/spark- in several gigabytes.



 with 1500 qps of message and window size of 60 seconds, I think it should
 be done within memory without writing to disk at all



 I've set executor-memory to 8G, So there is plenty of memory.



 $SPARK_HOME/bin/spark-submit \

   --class SimpleApp \

   --master spark://localhost:7077 \

   --driver-memory 16G  \

   --executor-memory 8G  \

   target/scala-2.10/simple-assembly-1.0.jar



 I also tries these settings, but it still spill to disk.



 spark.master spark://localhost:7077

 #spark.driver.memory  4g

 #spark.shuffle.file.buffer.kb 4096

 #spark.shuffle.memoryFraction 0.8

 #spark.storage.unrollFraction 0.8

 #spark.storage.unrollMemoryThreshold 1073741824

 spark.io.compression.codec   lz4

 spark.shuffle.spill  false

 spark.serializer org.apache.spark.serializer.KryoSerializer



 where am I wrong?



RE: [spark-streaming] can shuffle write to disk be disabled?

2015-03-18 Thread Shao, Saisai
Yeah, as I said your job processing time is much larger than the sliding 
window, and streaming job is executed one by one in sequence, so the next job 
will wait until the first job is finished, so the total latency will be 
accumulated.

I think you need to identify the bottleneck of your job at first. If the 
shuffle is so slow, you could enlarge the shuffle fraction of memory to reduce 
the spill, but finally the shuffle data will be written to disk, this cannot be 
disabled, unless you mount your spark.tmp.dir on ramdisk.

Besides if CPU or network is the bottleneck, you might need to add more 
resources to your cluster.

Thanks
Jerry

From: Darren Hoo [mailto:darren@gmail.com]
Sent: Wednesday, March 18, 2015 3:24 PM
To: Shao, Saisai
Cc: user@spark.apache.org
Subject: Re: [spark-streaming] can shuffle write to disk be disabled?

Hi, Saisai

Here is the duration of one of the jobs, 22 seconds in total, it is longer than 
the sliding window.

Stage Id  Description   Submitted Duration
Tasks: Succeeded/Total Input Output Shuffle Read  Shuffle Write
342 foreach at SimpleApp.scala:58   2015/03/18 15:06:58   16 s 
288/28810.6 MB
341   window at SimpleApp.scala:512015/03/18 15:06:52 6s   
288/288   12.3 MB  10.6 MB

And part of the driver log:


15/03/18 15:16:36 INFO DStreamGraph: Cleared checkpoint data for time 
1426662996000 ms

15/03/18 15:16:36 INFO ReceivedBlockTracker: Deleting batches 
ArrayBuffer(1426662932000 ms)

15/03/18 15:16:36 INFO TaskSetManager: Starting task 81.0 in stage 392.0 (TID 
100515, lvs02, PROCESS_LOCAL, 1122 bytes)

15/03/18 15:16:36 INFO TaskSetManager: Finished task 75.0 in stage 392.0 (TID 
100509) in 370 ms on lvs02 (75/291)

15/03/18 15:16:36 INFO TaskSetManager: Starting task 82.0 in stage 392.0 (TID 
100516, lvs02, PROCESS_LOCAL, 1122 bytes)

15/03/18 15:16:36 INFO TaskSetManager: Finished task 77.0 in stage 392.0 (TID 
100511) in 261 ms on lvs02 (76/291)

15/03/18 15:16:36 INFO TaskSetManager: Starting task 83.0 in stage 392.0 (TID 
100517, lvs02, PROCESS_LOCAL, 1122 bytes)

15/03/18 15:16:36 INFO TaskSetManager: Finished task 78.0 in stage 392.0 (TID 
100512) in 274 ms on lvs02 (77/291)

15/03/18 15:16:36 INFO TaskSetManager: Starting task 84.0 in stage 392.0 (TID 
100518, lvs02, PROCESS_LOCAL, 1122 bytes)

15/03/18 15:16:36 INFO TaskSetManager: Finished task 74.0 in stage 392.0 (TID 
100508) in 569 ms on lvs02 (78/291)

15/03/18 15:16:36 INFO BlockManagerInfo: Added input-0-1426662996000 in memory 
on lvs02:38954 (size: 398.3 KB, free: 1073.7 MB)

15/03/18 15:16:36 INFO TaskSetManager: Starting task 85.0 in stage 392.0 (TID 
100519, lvs02, PROCESS_LOCAL, 1122 bytes)

15/03/18 15:16:36 INFO TaskSetManager: Finished task 76.0 in stage 392.0 (TID 
100510) in 539 ms on lvs02 (79/291)

15/03/18 15:16:36 INFO TaskSetManager: Starting task 86.0 in stage 392.0 (TID 
100520, lvs02, PROCESS_LOCAL, 1122 bytes)

15/03/18 15:16:36 INFO TaskSetManager: Finished task 80.0 in stage 392.0 (TID 
100514) in 296 ms on lvs02 (80/291)

15/03/18 15:16:36 INFO TaskSetManager: Starting task 87.0 in stage 392.0 (TID 
100521, lvs02, PROCESS_LOCAL, 1122 bytes)

15/03/18 15:16:36 INFO TaskSetManager: Finished task 81.0 in stage 392.0 (TID 
100515) in 292 ms on lvs02 (81/291)

15/03/18 15:16:36 INFO TaskSetManager: Starting task 88.0 in stage 392.0 (TID 
100522, lvs02, PROCESS_LOCAL, 1122 bytes)

15/03/18 15:16:36 INFO TaskSetManager: Finished task 82.0 in stage 392.0 (TID 
100516) in 331 ms on lvs02 (82/291)

15/03/18 15:16:36 INFO TaskSetManager: Starting task 89.0 in stage 392.0 (TID 
100523, lvs02, PROCESS_LOCAL, 1122 bytes)

15/03/18 15:16:36 INFO TaskSetManager: Finished task 83.0 in stage 392.0 (TID 
100517) in 271 ms on lvs02 (83/291)

15/03/18 15:16:36 INFO BlockManagerInfo: Added input-0-1426662996200 in memory 
on lvs02:38954 (size: 31.0 KB, free: 1073.7 MB)

15/03/18 15:16:36 INFO TaskSetManager: Starting task 90.0 in stage 392.0 (TID 
100524, lvs02, PROCESS_LOCAL, 1122 bytes)

15/03/18 15:16:36 INFO TaskSetManager: Finished task 79.0 in stage 392.0 (TID 
100513) in 549 ms on lvs02 (84/291)

15/03/18 15:16:36 INFO TaskSetManager: Starting task 91.0 in stage 392.0 (TID 
100525, lvs02, PROCESS_LOCAL, 1122 bytes)

15/03/18 15:16:36 INFO TaskSetManager: Finished task 84.0 in stage 392.0 (TID 
100518) in 327 ms on lvs02 (85/291)

15/03/18 15:16:36 INFO TaskSetManager: Starting task 92.0 in stage 392.0 (TID 
100526, lvs02, PROCESS_LOCAL, 1122 bytes)

15/03/18 15:16:36 INFO TaskSetManager: Finished task 86.0 in stage 392.0 (TID 
100520) in 293 ms on lvs02 (86/291)

15/03/18 15:16:36 INFO TaskSetManager: Starting task 93.0 in stage 392.0 (TID 
100527, lvs02, PROCESS_LOCAL, 1122 bytes)

15/03/18 15:16:36 INFO TaskSetManager: Finished task 87.0 in stage 392.0 (TID 
100521) in 257 ms on lvs02 (87/291)

15/03/18 15:16:36 INFO TaskSetManager

RE: [spark-streaming] can shuffle write to disk be disabled?

2015-03-18 Thread Shao, Saisai
Please see the inline comments.

Thanks
Jerry

From: Darren Hoo [mailto:darren@gmail.com]
Sent: Wednesday, March 18, 2015 9:30 PM
To: Shao, Saisai
Cc: user@spark.apache.org; Akhil Das
Subject: Re: [spark-streaming] can shuffle write to disk be disabled?



On Wed, Mar 18, 2015 at 8:31 PM, Shao, Saisai 
saisai.s...@intel.commailto:saisai.s...@intel.com wrote:

From the log you pasted I think this (-rw-r--r--  1 root root  80K Mar 18 
16:54 shuffle_47_519_0.data) is not shuffle spilled data, but the final 
shuffle result.

why the shuffle result  is written to disk?

This is the internal mechanism for Spark.



As I said, did you think shuffle is the bottleneck which makes your job running 
slowly?

I am quite new to spark, So I am just doing wild guesses. which information 
should I provide further that
can help to find the real bottleneck?

You can monitor the system metrics, as well as JVM, also information from web 
UI is very useful.



Maybe you should identify the cause at first. Besides from the log it looks 
your memory is not enough the cache the data, maybe you should increase the 
memory size of the executor.



 running two executors, the memory ussage is quite low:

executor 0  8.6 MB / 4.1 GB
executor 1  23.9 MB / 4.1 GB
driver 0.0B / 529.9 MB


submitted with args : --executor-memory 8G  --num-executors 2 --driver-memory 1G




RE: [spark-streaming] can shuffle write to disk be disabled?

2015-03-18 Thread Shao, Saisai
Would you please check your driver log or streaming web UI to see each job's 
latency, including processing latency and total latency.

Seems from your code, sliding window is just 3 seconds, so you will process 
each 60 second's data in 3 seconds, if processing latency is larger than the 
sliding window, so maybe you computation power cannot reach to the qps you 
wanted.

I think you need to identify the bottleneck at first, and then trying to tune 
your code, balance the data, add more computation resources.

Thanks
Jerry

From: Darren Hoo [mailto:darren@gmail.com]
Sent: Wednesday, March 18, 2015 1:39 PM
To: user@spark.apache.org
Subject: [spark-streaming] can shuffle write to disk be disabled?

I use spark-streaming reading  messages from a Kafka,  the producer creates 
messages about 1500 per second


 def hash(x: String): Int = {

MurmurHash3.stringHash(x)

 }



 val stream = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap, 
StorageLevel.MEMORY_ONLY_SER).map(_._2)



 val clickstream = stream.map(log = {

   //parse log

   ...

  (hash(log.url), HashSet(hash(log.userid)))

}).window(Seconds(60), Seconds(3))



val upv = clickstream.transform( rdd = rdd.reduceByKey(_ ++ _ ).map{ 
case(url, visits) = {

 val uv = visits.size

 (uv, url)

}})



upv.foreach(rdd = println(new Date() + 
\n---\n + rdd.top(20).mkString(\n) + 
\n))



it is quite quick upon startup, but after running for a few minutes, it goes 
slower and slower and the latency can be minutes.



I found a lot of shuffle writes at /tmp/spark- in several gigabytes.



with 1500 qps of message and window size of 60 seconds, I think it should be 
done within memory without writing to disk at all



I've set executor-memory to 8G, So there is plenty of memory.



$SPARK_HOME/bin/spark-submit \

  --class SimpleApp \

  --master spark://localhost:7077 \

  --driver-memory 16G  \

  --executor-memory 8G  \

  target/scala-2.10/simple-assembly-1.0.jar



I also tries these settings, but it still spill to disk.



spark.master spark://localhost:7077

#spark.driver.memory  4g

#spark.shuffle.file.buffer.kb 4096

#spark.shuffle.memoryFraction 0.8

#spark.storage.unrollFraction 0.8

#spark.storage.unrollMemoryThreshold 1073741824

spark.io.compression.codec   lz4

spark.shuffle.spill  false

spark.serializer org.apache.spark.serializer.KryoSerializer



where am I wrong?