[spark-streaming] can shuffle write to disk be disabled?

2015-03-17 Thread Darren Hoo
I use spark-streaming reading  messages from a Kafka,  the producer creates
messages about 1500 per second

 def hash(x: String): Int = {

MurmurHash3.stringHash(x)

 }


 val stream = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap,
StorageLevel.MEMORY_ONLY_SER).map(_._2)


 val clickstream = stream.map(log => {

   //parse log

   ...

  (hash(log.url), HashSet(hash(log.userid)))

}).window(Seconds(60), Seconds(3))


val upv = clickstream.transform( rdd => rdd.reduceByKey(_ ++ _ ).map{
case(url, visits) => {

 val uv = visits.size

 (uv, url)

}})


upv.foreach(rdd => println(new Date() +
"\n---\n" + rdd.top(20).mkString("\n")
+ "\n"))


it is quite quick upon startup, but after running for a few minutes, it
goes slower and slower and the latency can be minutes.


I found a lot of shuffle writes at /tmp/spark- in several gigabytes.


with 1500 qps of message and window size of 60 seconds, I think it should
be done within memory without writing to disk at all


I've set executor-memory to 8G, So there is plenty of memory.


$SPARK_HOME/bin/spark-submit \

  --class "SimpleApp" \

  --master spark://localhost:7077 \

  --driver-memory 16G  \

  --executor-memory 8G  \

  target/scala-2.10/simple-assembly-1.0.jar


I also tries these settings, but it still spill to disk.


spark.master spark://localhost:7077

#spark.driver.memory  4g

#spark.shuffle.file.buffer.kb 4096

#spark.shuffle.memoryFraction 0.8

#spark.storage.unrollFraction 0.8

#spark.storage.unrollMemoryThreshold 1073741824

spark.io.compression.codec   lz4

spark.shuffle.spill  false

spark.serializer org.apache.spark.serializer.KryoSerializer


where am I wrong?


Re: [spark-streaming] can shuffle write to disk be disabled?

2015-03-18 Thread Darren Hoo
in stage 392.0
(TID 100621, lvs02, PROCESS_LOCAL, 1122 bytes)

15/03/18 15:16:41 INFO TaskSetManager: Finished task 179.0 in stage 392.0
(TID 100613) in 292 ms on lvs02 (181/291)

15/03/18 15:16:41 INFO TaskSetManager: Starting task 188.0 in stage 392.0
(TID 100622, lvs02, PROCESS_LOCAL, 1122 bytes)

15/03/18 15:16:41 INFO TaskSetManager: Finished task 182.0 in stage 392.0
(TID 100616) in 213 ms on lvs02 (182/291)

15/03/18 15:16:41 INFO BlockManagerInfo: Added input-0-1426663001400 in
memory on lvs02:38954 (size: 24.4 KB, free: 1068.1 MB)

15/03/18 15:16:41 INFO TaskSetManager: Starting task 189.0 in stage 392.0
(TID 100623, lvs02, PROCESS_LOCAL, 1122 bytes)

15/03/18 15:16:41 INFO TaskSetManager: Finished task 181.0 in stage 392.0
(TID 100615) in 286 ms on lvs02 (183/291)

15/03/18 15:16:41 INFO TaskSetManager: Starting task 190.0 in stage 392.0
(TID 100624, lvs02, PROCESS_LOCAL, 1122 bytes)

15/03/18 15:16:41 INFO TaskSetManager: Finished task 183.0 in stage 392.0
(TID 100617) in 261 ms on lvs02 (184/291)


Any hints?


Thanks!

On Wed, Mar 18, 2015 at 2:19 PM, Shao, Saisai  wrote:

>  Would you please check your driver log or streaming web UI to see each
> job's latency, including processing latency and total latency.
>
>
>
> Seems from your code, sliding window is just 3 seconds, so you will
> process each 60 second's data in 3 seconds, if processing latency is larger
> than the sliding window, so maybe you computation power cannot reach to the
> qps you wanted.
>
>
>
> I think you need to identify the bottleneck at first, and then trying to
> tune your code, balance the data, add more computation resources.
>
>
>
> Thanks
>
> Jerry
>
>
>
> *From:* Darren Hoo [mailto:darren@gmail.com]
> *Sent:* Wednesday, March 18, 2015 1:39 PM
> *To:* user@spark.apache.org
> *Subject:* [spark-streaming] can shuffle write to disk be disabled?
>
>
>
> I use spark-streaming reading  messages from a Kafka,  the producer
> creates messages about 1500 per second
>
>
>
>  def hash(x: String): Int = {
>
> MurmurHash3.stringHash(x)
>
>  }
>
>
>
>  val stream = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap,
> StorageLevel.MEMORY_ONLY_SER).map(_._2)
>
>
>
>  val clickstream = stream.map(log => {
>
>//parse log
>
>...
>
>   (hash(log.url), HashSet(hash(log.userid)))
>
> }).window(Seconds(60), Seconds(3))
>
>
>
> val upv = clickstream.transform( rdd => rdd.reduceByKey(_ ++ _ ).map{
> case(url, visits) => {
>
>  val uv = visits.size
>
>  (uv, url)
>
> }})
>
>
>
> upv.foreach(rdd => println(new Date() +
> "\n---\n" + rdd.top(20).mkString("\n")
> + "\n"))
>
>
>
> it is quite quick upon startup, but after running for a few minutes, it
> goes slower and slower and the latency can be minutes.
>
>
>
> I found a lot of shuffle writes at /tmp/spark- in several gigabytes.
>
>
>
> with 1500 qps of message and window size of 60 seconds, I think it should
> be done within memory without writing to disk at all
>
>
>
> I've set executor-memory to 8G, So there is plenty of memory.
>
>
>
> $SPARK_HOME/bin/spark-submit \
>
>   --class "SimpleApp" \
>
>   --master spark://localhost:7077 \
>
>   --driver-memory 16G  \
>
>   --executor-memory 8G  \
>
>   target/scala-2.10/simple-assembly-1.0.jar
>
>
>
> I also tries these settings, but it still spill to disk.
>
>
>
> spark.master spark://localhost:7077
>
> #spark.driver.memory  4g
>
> #spark.shuffle.file.buffer.kb 4096
>
> #spark.shuffle.memoryFraction 0.8
>
> #spark.storage.unrollFraction 0.8
>
> #spark.storage.unrollMemoryThreshold 1073741824
>
> spark.io.compression.codec   lz4
>
> spark.shuffle.spill  false
>
> spark.serializer org.apache.spark.serializer.KryoSerializer
>
>
>
> where am I wrong?
>


Re: [spark-streaming] can shuffle write to disk be disabled?

2015-03-18 Thread Darren Hoo
Thanks, Shao

On Wed, Mar 18, 2015 at 3:34 PM, Shao, Saisai  wrote:

>  Yeah, as I said your job processing time is much larger than the sliding
> window, and streaming job is executed one by one in sequence, so the next
> job will wait until the first job is finished, so the total latency will be
> accumulated.
>
>
>
> I think you need to identify the bottleneck of your job at first. If the
> shuffle is so slow, you could enlarge the shuffle fraction of memory to
> reduce the spill, but finally the shuffle data will be written to disk,
> this cannot be disabled, unless you mount your spark.tmp.dir on ramdisk.
>
>
>
I have increased spark.shuffle.memoryFraction  to  0.8  which I can see
from SparKUI's environment variables

But spill  always happens even from start when latency is less than slide
window(I changed it to 10 seconds),
the shuflle data disk written is really a snow ball effect,  it slows down
eventually.

I noticed that the files spilled to disk are all very small in size but
huge in numbers:

total 344K

drwxr-xr-x  2 root root 4.0K Mar 18 16:55 .

drwxr-xr-x 66 root root 4.0K Mar 18 16:39 ..

-rw-r--r--  1 root root  80K Mar 18 16:54 shuffle_47_519_0.data

-rw-r--r--  1 root root  75K Mar 18 16:54 shuffle_48_419_0.data

-rw-r--r--  1 root root  36K Mar 18 16:54 shuffle_48_518_0.data

-rw-r--r--  1 root root  69K Mar 18 16:55 shuffle_49_319_0.data

-rw-r--r--  1 root root  330 Mar 18 16:55 shuffle_49_418_0.data

-rw-r--r--  1 root root  65K Mar 18 16:55 shuffle_49_517_0.data

MemStore says:

15/03/18 17:59:43 WARN MemoryStore: Failed to reserve initial memory
threshold of 1024.0 KB for computing block rdd_1338_2 in memory.
15/03/18 17:59:43 WARN MemoryStore: Not enough space to cache
rdd_1338_2 in memory! (computed 512.0 B so far)
15/03/18 17:59:43 INFO MemoryStore: Memory use = 529.0 MB (blocks) +
0.0 B (scratch space shared across 0 thread(s)) = 529.0 MB. Storage
limit = 529.9 MB.

Not enough space even for 512 byte??


The executors still has plenty free memory:
0slave1:40778 0   0.0 B / 529.9 MB  0.0 B 16 0 15047 15063 2.17
h  0.0 B  402.3 MB  768.0 B
1 slave2:50452 0 0.0 B / 529.9 MB  0.0 B 16 0 14447 14463 2.17 h  0.0 B
388.8 MB  1248.0 B

1 lvs02:47325116 27.6 MB / 529.9 MB  0.0 B 8 0 58169 58177 3.16
h  893.5 MB  624.0 B  1189.9 MB

 lvs02:47041 0 0.0 B / 529.9 MB  0.0 B 0 0 0 0 0 ms  0.0 B  0.0
B  0.0 B


Besides if CPU or network is the bottleneck, you might need to add more
> resources to your cluster.
>
>
>
 3 dedicated servers each with CPU 16 cores + 16GB memory and Gigabyte
network.
 CPU load is quite low , about 1~3 from top,  and network usage  is far
from saturated.

 I don't even  do any usefull complex calculations in this small Simple App
yet.


Re: [spark-streaming] can shuffle write to disk be disabled?

2015-03-18 Thread Darren Hoo
I've already done that:

>From SparkUI Environment  Spark properties has:

spark.shuffle.spillfalse

On Wed, Mar 18, 2015 at 6:34 PM, Akhil Das 
wrote:

> I think you can disable it with spark.shuffle.spill=false
>
> Thanks
> Best Regards
>
> On Wed, Mar 18, 2015 at 3:39 PM, Darren Hoo  wrote:
>
>> Thanks, Shao
>>
>> On Wed, Mar 18, 2015 at 3:34 PM, Shao, Saisai 
>> wrote:
>>
>>>  Yeah, as I said your job processing time is much larger than the
>>> sliding window, and streaming job is executed one by one in sequence, so
>>> the next job will wait until the first job is finished, so the total
>>> latency will be accumulated.
>>>
>>>
>>>
>>> I think you need to identify the bottleneck of your job at first. If the
>>> shuffle is so slow, you could enlarge the shuffle fraction of memory to
>>> reduce the spill, but finally the shuffle data will be written to disk,
>>> this cannot be disabled, unless you mount your spark.tmp.dir on ramdisk.
>>>
>>>
>>>
>> I have increased spark.shuffle.memoryFraction  to  0.8  which I can see
>> from SparKUI's environment variables
>>
>> But spill  always happens even from start when latency is less than slide
>> window(I changed it to 10 seconds),
>> the shuflle data disk written is really a snow ball effect,  it slows
>> down eventually.
>>
>> I noticed that the files spilled to disk are all very small in size but
>> huge in numbers:
>>
>> total 344K
>>
>> drwxr-xr-x  2 root root 4.0K Mar 18 16:55 .
>>
>> drwxr-xr-x 66 root root 4.0K Mar 18 16:39 ..
>>
>> -rw-r--r--  1 root root  80K Mar 18 16:54 shuffle_47_519_0.data
>>
>> -rw-r--r--  1 root root  75K Mar 18 16:54 shuffle_48_419_0.data
>>
>> -rw-r--r--  1 root root  36K Mar 18 16:54 shuffle_48_518_0.data
>>
>> -rw-r--r--  1 root root  69K Mar 18 16:55 shuffle_49_319_0.data
>>
>> -rw-r--r--  1 root root  330 Mar 18 16:55 shuffle_49_418_0.data
>>
>> -rw-r--r--  1 root root  65K Mar 18 16:55 shuffle_49_517_0.data
>>
>> MemStore says:
>>
>> 15/03/18 17:59:43 WARN MemoryStore: Failed to reserve initial memory 
>> threshold of 1024.0 KB for computing block rdd_1338_2 in memory.
>> 15/03/18 17:59:43 WARN MemoryStore: Not enough space to cache rdd_1338_2 in 
>> memory! (computed 512.0 B so far)
>> 15/03/18 17:59:43 INFO MemoryStore: Memory use = 529.0 MB (blocks) + 0.0 B 
>> (scratch space shared across 0 thread(s)) = 529.0 MB. Storage limit = 529.9 
>> MB.
>>
>> Not enough space even for 512 byte??
>>
>>
>> The executors still has plenty free memory:
>> 0slave1:40778 0   0.0 B / 529.9 MB  0.0 B 16 0 15047 15063 2.17
>> h  0.0 B  402.3 MB  768.0 B
>> 1 slave2:50452 0 0.0 B / 529.9 MB  0.0 B 16 0 14447 14463 2.17 h  0.0 B
>> 388.8 MB  1248.0 B
>>
>> 1 lvs02:47325116 27.6 MB / 529.9 MB  0.0 B 8 0 58169 58177 3.16
>> h  893.5 MB  624.0 B  1189.9 MB
>>
>>  lvs02:47041 0 0.0 B / 529.9 MB  0.0 B 0 0 0 0 0 ms  0.0 B
>> 0.0 B  0.0 B
>>
>>
>> Besides if CPU or network is the bottleneck, you might need to add more
>>> resources to your cluster.
>>>
>>>
>>>
>>  3 dedicated servers each with CPU 16 cores + 16GB memory and Gigabyte
>> network.
>>  CPU load is quite low , about 1~3 from top,  and network usage  is far
>> from saturated.
>>
>>  I don't even  do any usefull complex calculations in this small Simple
>> App yet.
>>
>>
>>
>


Re: [spark-streaming] can shuffle write to disk be disabled?

2015-03-18 Thread Darren Hoo
On Wed, Mar 18, 2015 at 8:31 PM, Shao, Saisai  wrote:

>  From the log you pasted I think this (-rw-r--r--  1 root root  80K Mar
> 18 16:54 shuffle_47_519_0.data) is not shuffle spilled data, but the
> final shuffle result.
>

why the shuffle result  is written to disk?


> As I said, did you think shuffle is the bottleneck which makes your job
> running slowly?
>

I am quite new to spark, So I am just doing wild guesses. which information
should I provide further that
can help to find the real bottleneck?

Maybe you should identify the cause at first. Besides from the log it looks
> your memory is not enough the cache the data, maybe you should increase the
> memory size of the executor.
>
>
>

 running two executors, the memory ussage is quite low:

executor 0  8.6 MB / 4.1 GB
executor 1  23.9 MB / 4.1 GB
 0.0B / 529.9 MB

submitted with args : --executor-memory 8G  --num-executors 2
--driver-memory 1G


can distinct transform applied on DStream?

2015-03-20 Thread Darren Hoo
val aDstream = ...

val distinctStream = aDstream.transform(_.distinct())

but the elements in distinctStream  are not distinct.

Did I use it wrong?