Hi,

If the slow memory increase is in the driver, it could be related to this:
https://issues.apache.org/jira/browse/SPARK-5967

*"After some hours disk space is being consumed. There are a lot of*

*directories with name like
"/tmp/spark-e3505437-f509-4b5b-92d2-ae2559badb3c"*


Spark doesn't automatically clean up those shuffle directories and in my
experience led to " (java.lang.Exception: Could not compute split, block

input-0-1429168311800 not found) "


One approach is to clean them up manually with a cron job.

Another is to set spark.cleaner.ttl to some reasonable time and also
set spark.streaming.unpersist=true.


Those together cleaned up the shuffle files for us.


-Conor




On Tue, Apr 21, 2015 at 11:27 AM, González Salgado, Miquel <
miquel.gonza...@tecsidel.es> wrote:

> thank you Luis,
>
> I have tried without the window operation, but the memory leak is still
> present...
>
> I think it must be something related to spark, running some exemple as
> TwitterPopularTags, it happens the same.
>
>
>
> I will post something if I found a solution
>
>
>
>
>
> *De:* Luis Ángel Vicente Sánchez [mailto:langel.gro...@gmail.com]
> *Enviado el:* lunes, 20 de abril de 2015 15:46
> *Para:* Marius Soutier
> *CC:* González Salgado, Miquel; bit1...@163.com; user
> *Asunto:* Re: Streaming problems running 24x7
>
>
>
> You have a window operation; I have seen that behaviour before with window
> operations in spark streaming. My solution was to move away from window
> operations using probabilistic data structures; it might not be an option
> for you.
>
>
>
> 2015-04-20 10:29 GMT+01:00 Marius Soutier <mps....@gmail.com>:
>
> The processing speed displayed in the UI doesn’t seem to take everything
> into account. I also had a low processing time but had to increase batch
> duration from 30 seconds to 1 minute because waiting batches kept
> increasing. Now it runs fine.
>
>
>
> On 17.04.2015, at 13:30, González Salgado, Miquel <
> miquel.gonza...@tecsidel.es> wrote:
>
>
>
> Hi,
>
>
>
> Thank you for your response,
>
> I think it is not because of the processing speed, in fact the delay is
> under 1 second, while the batch interval is 10 seconds… The data volume is
> low (10 lines / second)
>
>
>
> Changing to local[8] was worsening the problem (cpu increase more quickly)
>
>
>
> By the way, I have seen some results changing to this call of Kafkautils:
>
>
> KafkaUtils.createDirectStream
>
>
>
> CPU usage is low and stable, but memory is slowly increasing… But at least
> the process last longer..
>
>
>
> Best regards,
>
> Miquel
>
>
>
>
>
> *De:* bit1...@163.com [mailto:bit1...@163.com <bit1...@163.com>]
> *Enviado el:* jueves, 16 de abril de 2015 10:58
> *Para:* González Salgado, Miquel; user
> *Asunto:* Re: Streaming problems running 24x7
>
>
>
> From your description, looks like the data processing speed is far behind
> the data receiving speed
>
>
>
> Could you try to increase the core number when you submit the application?
> such as local[8]?
>
>
> ------------------------------
>
> bit1...@163.com
>
>
>
> *From:* Miquel <miquel.gonza...@tecsidel.es>
>
> *Date:* 2015-04-16 16:39
>
> *To:* user <user@spark.apache.org>
>
> *Subject:* Streaming problems running 24x7
>
> Hello,
>
> I'm finding problems to run a spark streaming job for more than a few hours
>
> (3 or 4). It begins working OK, but it degrades until failure. Some of the
>
> symptoms:
>
>
>
> - Consumed memory and CPU keeps getting higher ang higher, and finally some
>
> error is being thrown (java.lang.Exception: Could not compute split, block
>
> input-0-1429168311800 not found) and data stops being calculated.
>
>
>
> - The delay showed in web UI keeps also increasing.
>
>
>
> - After some hours disk space is being consumed. There are a lot of
>
> directories with name like
> "/tmp/spark-e3505437-f509-4b5b-92d2-ae2559badb3c"
>
>
>
> The job is basically reading information from kafka topic, and calculate
>
> several topN tables for some key and value camps related with netflow data,
>
> some of the parameters are this:
>
> - batch interval: 10 seconds
>
> - window calculation: 1 minute
>
> - spark.cleaner.ttl: 5 minutes
>
>
>
> The execution is standalone on one machine (16GB RAM , 12 cores), and the
>
> options to run it is as follows:
>
> /opt/spark/bin/spark-submit --driver-java-options "-XX:+UseCompressedOops"
>
> --jars $JARS --class $APPCLASS --master local[2] $APPJAR
>
>
>
> someone has some clues about the problem? I don't know if it is a
>
> configuration problem or some error in the code that is causing memory
>
> leaks..
>
>
>
> Thank you in advance!
>
> Miquel
>
>
>
> PD: the code is basically this:--------------------------------------
>
>
>
> object NetflowTopn {
>
>
>
>   var appPath = "."
>
>   var zkQuorum = ""
>
>   var group = ""
>
>   var topics = ""
>
>   var numThreads = 1
>
>
>
>   var batch_interval = 10
>
>   var n_window = 1
>
>   var n_slide = 1
>
>   var topnsize = 10
>
>
>
>   var hm = Map[String,Int]()
>
>   hm += ( "unix_secs" ->  0 )
>
>   hm += ( "unix_nsecs" -> 1 )
>
>   hm += ( "sysuptime" ->  2 )
>
>   hm += ( "exaddr" ->     3 )
>
>   hm += ( "dpkts" ->      4 )
>
>   hm += ( "doctets" ->    5 )
>
>   hm += ( "first" ->      6 )
>
>   hm += ( "last" ->       7 )
>
>   hm += ( "engine_type" -> 8 )
>
>   hm += ( "engine_id" ->   9 )
>
>   hm += ( "srcaddr" ->    10 )
>
>   hm += ( "dstaddr" ->    11 )
>
>   hm += ( "nexthop" ->    12 )
>
>   hm += ( "input" ->      13 )
>
>   hm += ( "output" ->     14 )
>
>   hm += ( "srcport" ->    15 )
>
>   hm += ( "dstport" ->    16 )
>
>   hm += ( "prot" ->       17 )
>
>   hm += ( "tos" ->        18 )
>
>   hm += ( "tcp_flags" ->  19 )
>
>   hm += ( "src_mask" ->   20 )
>
>   hm += ( "dst_mask" ->   21 )
>
>   hm += ( "src_as" ->     22 )
>
>   hm += ( "dst_as" ->     23 )
>
>
>
>   def getKey (lcamps: Array[String], camp: String): String  = {
>
>     if (camp == "total") return "total"
>
>     else return lcamps(hm(camp))
>
>   }
>
>
>
>   def getVal (lcamps: Array[String], camp: String): Long  = {
>
>     if (camp == "flows") return 1L
>
>     else return lcamps(hm(camp)).toLong
>
>   }
>
>
>
>   def getKeyVal (line: String, keycamps: List[String], valcamp: String ) =
> {
>
>     val arr = line.split(",")
>
>     (keycamps.map(getKey(arr, _)).mkString(",")   ,   getVal(arr,valcamp) )
>
>   }
>
>
>
>   def writeOutput (data: Array[(Long, String)], keycamps_str: String,
>
> csvheader: String, valcamp: String, prefix: String) = {
>
>
>
>            val ts = System.currentTimeMillis
>
>            val f1 = appPath + "/data/" + prefix + "_" + keycamps_str + "_"
> +
>
> valcamp + ".csv"
>
>            val f1f = new File(f1);
>
>            val ftmpf = new File(f1 + ts);
>
>            val pw = new PrintWriter(ftmpf)
>
>            pw.println(csvheader)
>
>            data.foreach{
>
>                 t =>  pw.println (t._2 + "," + t._1)
>
>            }
>
>            pw.close
>
>            ftmpf.renameTo(f1f);
>
>
>
>   }
>
>
>
>
>
>   def main(args: Array[String]) {
>
>
>
>     if (args.length < 1) {
>
>       System.err.println("Usage: NetflowTopn <apppath>")
>
>       System.exit(1)
>
>     }
>
>
>
>     appPath = args(0)
>
>
>
>     try {
>
>        val prop = new Properties()
>
>        prop.load(new FileInputStream(appPath + "/conf/app.properties"))
>
>
>
>        zkQuorum =        prop.getProperty("KAFKA_HOST")
>
>        group =           prop.getProperty("KAFKA_GROUP")
>
>        topics =          prop.getProperty("KAFKA_TOPIC")
>
>        numThreads =      prop.getProperty("THREADS").toInt
>
>
>
>     } catch { case e: Exception =>
>
>        e.printStackTrace()
>
>        sys.exit(1)
>
>     }
>
>
>
>     val sparkConf = new SparkConf().setAppName("netflow-topn")
>
>                                    .set("spark.default.parallelism", "2")
>
>                                    .set("spark.rdd.compress", "true")
>
>    .set("spark.streaming.unpersist", "true")
>
>                                    .set("spark.cleaner.ttl", "300")
>
>
>
>     val ssc =  new StreamingContext(sparkConf, Seconds(batch_interval))
>
>
>
>     val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
>
>
>
>     val kafpar = Map[String, String](
>
>       "zookeeper.connect" -> zkQuorum,
>
>       "group.id" -> group,
>
>       "zookeeper.connection.timeout.ms" -> "5000",
>
>       "auto.commit.interval.ms" -> "60000",
>
>       "auto.offset.reset" -> "largest"
>
>     )
>
>
>
>     val lines = KafkaUtils.createStream[String, String, StringDecoder,
>
> StringDecoder] (ssc, kafpar, topicMap,
>
> StorageLevel.MEMORY_ONLY_SER).map(_._2).cache()
>
>
>
>    val ll_keycamps = List ( List("srcaddr", "dstaddr")
>
>                             ,List("dstaddr")
>
>                             ,List("srcaddr")
>
>                             ,List("srcport")
>
>                             ,List("dstport")
>
>                             ,List("total")
>
>                           )
>
>
>
>    val l_valcamps = List ("doctets"
>
>                          ,"dpkts"
>
>                          ,"flows"
>
>                           )
>
>
>
>    for (keycamps <- ll_keycamps) {
>
>
>
>          val keycamps_str = keycamps.mkString("-")
>
>          val csvheader = keycamps.mkString(",") + ",amount"
>
>
>
>          for (valcamp <- l_valcamps) {
>
>
>
>               val lines2 = lines.map( getKeyVal (_, keycamps, valcamp )
>
> ).cache()
>
>
>
>               lines2.reduceByKeyAndWindow((a:Long,b:Long)=>a+b,
> Seconds(60),
>
> Seconds(10))
>
>                       .map(_.swap)
>
>                       .transform(_.sortByKey(false))
>
>                       .foreachRDD(rdd => {
>
>                              val data = rdd.take(20)
>
>                              writeOutput (data, keycamps_str, csvheader,
>
> valcamp, "DATAWINDOW")
>
>                      })
>
>
>
>
>
>               lines2.reduceByKey((a:Long,b:Long)=>a+b)
>
>                       .map(_.swap)
>
>                       .transform(_.sortByKey(false))
>
>                       .foreachRDD(rdd => {
>
>                              val data = rdd.take(20)
>
>                              writeOutput (data, keycamps_str, csvheader,
>
> valcamp, "DATA")
>
>                      })
>
>
>
>          }
>
>     }
>
>
>
>     ssc.start()
>
>     ssc.awaitTermination()
>
>
>
>   }
>
> }
>
>
>
>
>
>
>
>
>
>
>
>
>
> --
>
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-problems-running-24x7-tp22518.html
>
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
>
>
> ---------------------------------------------------------------------
>
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>
>
>

Reply via email to