Hi, If the slow memory increase is in the driver, it could be related to this: https://issues.apache.org/jira/browse/SPARK-5967
*"After some hours disk space is being consumed. There are a lot of* *directories with name like "/tmp/spark-e3505437-f509-4b5b-92d2-ae2559badb3c"* Spark doesn't automatically clean up those shuffle directories and in my experience led to " (java.lang.Exception: Could not compute split, block input-0-1429168311800 not found) " One approach is to clean them up manually with a cron job. Another is to set spark.cleaner.ttl to some reasonable time and also set spark.streaming.unpersist=true. Those together cleaned up the shuffle files for us. -Conor On Tue, Apr 21, 2015 at 11:27 AM, González Salgado, Miquel < miquel.gonza...@tecsidel.es> wrote: > thank you Luis, > > I have tried without the window operation, but the memory leak is still > present... > > I think it must be something related to spark, running some exemple as > TwitterPopularTags, it happens the same. > > > > I will post something if I found a solution > > > > > > *De:* Luis Ángel Vicente Sánchez [mailto:langel.gro...@gmail.com] > *Enviado el:* lunes, 20 de abril de 2015 15:46 > *Para:* Marius Soutier > *CC:* González Salgado, Miquel; bit1...@163.com; user > *Asunto:* Re: Streaming problems running 24x7 > > > > You have a window operation; I have seen that behaviour before with window > operations in spark streaming. My solution was to move away from window > operations using probabilistic data structures; it might not be an option > for you. > > > > 2015-04-20 10:29 GMT+01:00 Marius Soutier <mps....@gmail.com>: > > The processing speed displayed in the UI doesn’t seem to take everything > into account. I also had a low processing time but had to increase batch > duration from 30 seconds to 1 minute because waiting batches kept > increasing. Now it runs fine. > > > > On 17.04.2015, at 13:30, González Salgado, Miquel < > miquel.gonza...@tecsidel.es> wrote: > > > > Hi, > > > > Thank you for your response, > > I think it is not because of the processing speed, in fact the delay is > under 1 second, while the batch interval is 10 seconds… The data volume is > low (10 lines / second) > > > > Changing to local[8] was worsening the problem (cpu increase more quickly) > > > > By the way, I have seen some results changing to this call of Kafkautils: > > > KafkaUtils.createDirectStream > > > > CPU usage is low and stable, but memory is slowly increasing… But at least > the process last longer.. > > > > Best regards, > > Miquel > > > > > > *De:* bit1...@163.com [mailto:bit1...@163.com <bit1...@163.com>] > *Enviado el:* jueves, 16 de abril de 2015 10:58 > *Para:* González Salgado, Miquel; user > *Asunto:* Re: Streaming problems running 24x7 > > > > From your description, looks like the data processing speed is far behind > the data receiving speed > > > > Could you try to increase the core number when you submit the application? > such as local[8]? > > > ------------------------------ > > bit1...@163.com > > > > *From:* Miquel <miquel.gonza...@tecsidel.es> > > *Date:* 2015-04-16 16:39 > > *To:* user <user@spark.apache.org> > > *Subject:* Streaming problems running 24x7 > > Hello, > > I'm finding problems to run a spark streaming job for more than a few hours > > (3 or 4). It begins working OK, but it degrades until failure. Some of the > > symptoms: > > > > - Consumed memory and CPU keeps getting higher ang higher, and finally some > > error is being thrown (java.lang.Exception: Could not compute split, block > > input-0-1429168311800 not found) and data stops being calculated. > > > > - The delay showed in web UI keeps also increasing. > > > > - After some hours disk space is being consumed. There are a lot of > > directories with name like > "/tmp/spark-e3505437-f509-4b5b-92d2-ae2559badb3c" > > > > The job is basically reading information from kafka topic, and calculate > > several topN tables for some key and value camps related with netflow data, > > some of the parameters are this: > > - batch interval: 10 seconds > > - window calculation: 1 minute > > - spark.cleaner.ttl: 5 minutes > > > > The execution is standalone on one machine (16GB RAM , 12 cores), and the > > options to run it is as follows: > > /opt/spark/bin/spark-submit --driver-java-options "-XX:+UseCompressedOops" > > --jars $JARS --class $APPCLASS --master local[2] $APPJAR > > > > someone has some clues about the problem? I don't know if it is a > > configuration problem or some error in the code that is causing memory > > leaks.. > > > > Thank you in advance! > > Miquel > > > > PD: the code is basically this:-------------------------------------- > > > > object NetflowTopn { > > > > var appPath = "." > > var zkQuorum = "" > > var group = "" > > var topics = "" > > var numThreads = 1 > > > > var batch_interval = 10 > > var n_window = 1 > > var n_slide = 1 > > var topnsize = 10 > > > > var hm = Map[String,Int]() > > hm += ( "unix_secs" -> 0 ) > > hm += ( "unix_nsecs" -> 1 ) > > hm += ( "sysuptime" -> 2 ) > > hm += ( "exaddr" -> 3 ) > > hm += ( "dpkts" -> 4 ) > > hm += ( "doctets" -> 5 ) > > hm += ( "first" -> 6 ) > > hm += ( "last" -> 7 ) > > hm += ( "engine_type" -> 8 ) > > hm += ( "engine_id" -> 9 ) > > hm += ( "srcaddr" -> 10 ) > > hm += ( "dstaddr" -> 11 ) > > hm += ( "nexthop" -> 12 ) > > hm += ( "input" -> 13 ) > > hm += ( "output" -> 14 ) > > hm += ( "srcport" -> 15 ) > > hm += ( "dstport" -> 16 ) > > hm += ( "prot" -> 17 ) > > hm += ( "tos" -> 18 ) > > hm += ( "tcp_flags" -> 19 ) > > hm += ( "src_mask" -> 20 ) > > hm += ( "dst_mask" -> 21 ) > > hm += ( "src_as" -> 22 ) > > hm += ( "dst_as" -> 23 ) > > > > def getKey (lcamps: Array[String], camp: String): String = { > > if (camp == "total") return "total" > > else return lcamps(hm(camp)) > > } > > > > def getVal (lcamps: Array[String], camp: String): Long = { > > if (camp == "flows") return 1L > > else return lcamps(hm(camp)).toLong > > } > > > > def getKeyVal (line: String, keycamps: List[String], valcamp: String ) = > { > > val arr = line.split(",") > > (keycamps.map(getKey(arr, _)).mkString(",") , getVal(arr,valcamp) ) > > } > > > > def writeOutput (data: Array[(Long, String)], keycamps_str: String, > > csvheader: String, valcamp: String, prefix: String) = { > > > > val ts = System.currentTimeMillis > > val f1 = appPath + "/data/" + prefix + "_" + keycamps_str + "_" > + > > valcamp + ".csv" > > val f1f = new File(f1); > > val ftmpf = new File(f1 + ts); > > val pw = new PrintWriter(ftmpf) > > pw.println(csvheader) > > data.foreach{ > > t => pw.println (t._2 + "," + t._1) > > } > > pw.close > > ftmpf.renameTo(f1f); > > > > } > > > > > > def main(args: Array[String]) { > > > > if (args.length < 1) { > > System.err.println("Usage: NetflowTopn <apppath>") > > System.exit(1) > > } > > > > appPath = args(0) > > > > try { > > val prop = new Properties() > > prop.load(new FileInputStream(appPath + "/conf/app.properties")) > > > > zkQuorum = prop.getProperty("KAFKA_HOST") > > group = prop.getProperty("KAFKA_GROUP") > > topics = prop.getProperty("KAFKA_TOPIC") > > numThreads = prop.getProperty("THREADS").toInt > > > > } catch { case e: Exception => > > e.printStackTrace() > > sys.exit(1) > > } > > > > val sparkConf = new SparkConf().setAppName("netflow-topn") > > .set("spark.default.parallelism", "2") > > .set("spark.rdd.compress", "true") > > .set("spark.streaming.unpersist", "true") > > .set("spark.cleaner.ttl", "300") > > > > val ssc = new StreamingContext(sparkConf, Seconds(batch_interval)) > > > > val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap > > > > val kafpar = Map[String, String]( > > "zookeeper.connect" -> zkQuorum, > > "group.id" -> group, > > "zookeeper.connection.timeout.ms" -> "5000", > > "auto.commit.interval.ms" -> "60000", > > "auto.offset.reset" -> "largest" > > ) > > > > val lines = KafkaUtils.createStream[String, String, StringDecoder, > > StringDecoder] (ssc, kafpar, topicMap, > > StorageLevel.MEMORY_ONLY_SER).map(_._2).cache() > > > > val ll_keycamps = List ( List("srcaddr", "dstaddr") > > ,List("dstaddr") > > ,List("srcaddr") > > ,List("srcport") > > ,List("dstport") > > ,List("total") > > ) > > > > val l_valcamps = List ("doctets" > > ,"dpkts" > > ,"flows" > > ) > > > > for (keycamps <- ll_keycamps) { > > > > val keycamps_str = keycamps.mkString("-") > > val csvheader = keycamps.mkString(",") + ",amount" > > > > for (valcamp <- l_valcamps) { > > > > val lines2 = lines.map( getKeyVal (_, keycamps, valcamp ) > > ).cache() > > > > lines2.reduceByKeyAndWindow((a:Long,b:Long)=>a+b, > Seconds(60), > > Seconds(10)) > > .map(_.swap) > > .transform(_.sortByKey(false)) > > .foreachRDD(rdd => { > > val data = rdd.take(20) > > writeOutput (data, keycamps_str, csvheader, > > valcamp, "DATAWINDOW") > > }) > > > > > > lines2.reduceByKey((a:Long,b:Long)=>a+b) > > .map(_.swap) > > .transform(_.sortByKey(false)) > > .foreachRDD(rdd => { > > val data = rdd.take(20) > > writeOutput (data, keycamps_str, csvheader, > > valcamp, "DATA") > > }) > > > > } > > } > > > > ssc.start() > > ssc.awaitTermination() > > > > } > > } > > > > > > > > > > > > > > -- > > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-problems-running-24x7-tp22518.html > > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > > > --------------------------------------------------------------------- > > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > > For additional commands, e-mail: user-h...@spark.apache.org > > > > >