I use spark-streaming reading  messages from a Kafka,  the producer creates
messages about 1500 per second

     def hash(x: String): Int = {

        MurmurHash3.stringHash(x)

     }


     val stream = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap,
StorageLevel.MEMORY_ONLY_SER).map(_._2)


     val clickstream = stream.map(log => {

       //parse log

       ...

      (hash(log.url), HashSet(hash(log.userid)))

    }).window(Seconds(60), Seconds(3))


    val upv = clickstream.transform( rdd => rdd.reduceByKey(_ ++ _ ).map{
case(url, visits) => {

         val uv = visits.size

         (uv, url)

    }})


    upv.foreach(rdd => println(new Date() +
"\n---------------------------------------\n" + rdd.top(20).mkString("\n")
+ "\n"))


it is quite quick upon startup, but after running for a few minutes, it
goes slower and slower and the latency can be minutes.


I found a lot of shuffle writes at /tmp/spark-xxxx in several gigabytes.


with 1500 qps of message and window size of 60 seconds, I think it should
be done within memory without writing to disk at all


I've set executor-memory to 8G, So there is plenty of memory.


$SPARK_HOME/bin/spark-submit \

  --class "SimpleApp" \

  --master spark://localhost:7077 \

  --driver-memory 16G  \

  --executor-memory 8G  \

  target/scala-2.10/simple-assembly-1.0.jar


I also tries these settings, but it still spill to disk.


spark.master                     spark://localhost:7077

#spark.driver.memory              4g

#spark.shuffle.file.buffer.kb     4096

#spark.shuffle.memoryFraction     0.8

#spark.storage.unrollFraction     0.8

#spark.storage.unrollMemoryThreshold 1073741824

spark.io.compression.codec       lz4

spark.shuffle.spill              false

spark.serializer                 org.apache.spark.serializer.KryoSerializer


where am I wrong?

Reply via email to