Hi,

Thank you for your response,
I think it is not because of the processing speed, in fact the delay is under 1 
second, while the batch interval is 10 seconds... The data volume is low (10 
lines / second)

Changing to local[8] was worsening the problem (cpu increase more quickly)

By the way, I have seen some results changing to this call of Kafkautils:
KafkaUtils.createDirectStream

CPU usage is low and stable, but memory is slowly increasing... But at least 
the process last longer..

Best regards,
Miquel


De: bit1...@163.com [mailto:bit1...@163.com]
Enviado el: jueves, 16 de abril de 2015 10:58
Para: González Salgado, Miquel; user
Asunto: Re: Streaming problems running 24x7

>From your description, looks like the data processing speed is far behind the 
>data receiving speed

Could you try to increase the core number when you submit the application? such 
as local[8]?

________________________________
bit1...@163.com<mailto:bit1...@163.com>

From: Miquel<mailto:miquel.gonza...@tecsidel.es>
Date: 2015-04-16 16:39
To: user<mailto:user@spark.apache.org>
Subject: Streaming problems running 24x7
Hello,
I'm finding problems to run a spark streaming job for more than a few hours
(3 or 4). It begins working OK, but it degrades until failure. Some of the
symptoms:

- Consumed memory and CPU keeps getting higher ang higher, and finally some
error is being thrown (java.lang.Exception: Could not compute split, block
input-0-1429168311800 not found) and data stops being calculated.

- The delay showed in web UI keeps also increasing.

- After some hours disk space is being consumed. There are a lot of
directories with name like "/tmp/spark-e3505437-f509-4b5b-92d2-ae2559badb3c"

The job is basically reading information from kafka topic, and calculate
several topN tables for some key and value camps related with netflow data,
some of the parameters are this:
- batch interval: 10 seconds
- window calculation: 1 minute
- spark.cleaner.ttl: 5 minutes

The execution is standalone on one machine (16GB RAM , 12 cores), and the
options to run it is as follows:
/opt/spark/bin/spark-submit --driver-java-options "-XX:+UseCompressedOops"
--jars $JARS --class $APPCLASS --master local[2] $APPJAR

someone has some clues about the problem? I don't know if it is a
configuration problem or some error in the code that is causing memory
leaks..

Thank you in advance!
Miquel

PD: the code is basically this:--------------------------------------

object NetflowTopn {

  var appPath = "."
  var zkQuorum = ""
  var group = ""
  var topics = ""
  var numThreads = 1

  var batch_interval = 10
  var n_window = 1
  var n_slide = 1
  var topnsize = 10

  var hm = Map[String,Int]()
  hm += ( "unix_secs" ->  0 )
  hm += ( "unix_nsecs" -> 1 )
  hm += ( "sysuptime" ->  2 )
  hm += ( "exaddr" ->     3 )
  hm += ( "dpkts" ->      4 )
  hm += ( "doctets" ->    5 )
  hm += ( "first" ->      6 )
  hm += ( "last" ->       7 )
  hm += ( "engine_type" -> 8 )
  hm += ( "engine_id" ->   9 )
  hm += ( "srcaddr" ->    10 )
  hm += ( "dstaddr" ->    11 )
  hm += ( "nexthop" ->    12 )
  hm += ( "input" ->      13 )
  hm += ( "output" ->     14 )
  hm += ( "srcport" ->    15 )
  hm += ( "dstport" ->    16 )
  hm += ( "prot" ->       17 )
  hm += ( "tos" ->        18 )
  hm += ( "tcp_flags" ->  19 )
  hm += ( "src_mask" ->   20 )
  hm += ( "dst_mask" ->   21 )
  hm += ( "src_as" ->     22 )
  hm += ( "dst_as" ->     23 )

  def getKey (lcamps: Array[String], camp: String): String  = {
    if (camp == "total") return "total"
    else return lcamps(hm(camp))
  }

  def getVal (lcamps: Array[String], camp: String): Long  = {
    if (camp == "flows") return 1L
    else return lcamps(hm(camp)).toLong
  }

  def getKeyVal (line: String, keycamps: List[String], valcamp: String ) = {
    val arr = line.split(",")
    (keycamps.map(getKey(arr, _)).mkString(",")   ,   getVal(arr,valcamp) )
  }

  def writeOutput (data: Array[(Long, String)], keycamps_str: String,
csvheader: String, valcamp: String, prefix: String) = {

           val ts = System.currentTimeMillis
           val f1 = appPath + "/data/" + prefix + "_" + keycamps_str + "_" +
valcamp + ".csv"
           val f1f = new File(f1);
           val ftmpf = new File(f1 + ts);
           val pw = new PrintWriter(ftmpf)
           pw.println(csvheader)
           data.foreach{
                t =>  pw.println (t._2 + "," + t._1)
           }
           pw.close
           ftmpf.renameTo(f1f);

  }


  def main(args: Array[String]) {

    if (args.length < 1) {
      System.err.println("Usage: NetflowTopn <apppath>")
      System.exit(1)
    }

    appPath = args(0)

    try {
       val prop = new Properties()
       prop.load(new FileInputStream(appPath + "/conf/app.properties"))

       zkQuorum =        prop.getProperty("KAFKA_HOST")
       group =           prop.getProperty("KAFKA_GROUP")
       topics =          prop.getProperty("KAFKA_TOPIC")
       numThreads =      prop.getProperty("THREADS").toInt

    } catch { case e: Exception =>
       e.printStackTrace()
       sys.exit(1)
    }

    val sparkConf = new SparkConf().setAppName("netflow-topn")
                                   .set("spark.default.parallelism", "2")
                                   .set("spark.rdd.compress", "true")
   .set("spark.streaming.unpersist", "true")
                                   .set("spark.cleaner.ttl", "300")

    val ssc =  new StreamingContext(sparkConf, Seconds(batch_interval))

    val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap

    val kafpar = Map[String, String](
      "zookeeper.connect" -> zkQuorum,
      "group.id" -> group,
      "zookeeper.connection.timeout.ms" -> "5000",
      "auto.commit.interval.ms" -> "60000",
      "auto.offset.reset" -> "largest"
    )

    val lines = KafkaUtils.createStream[String, String, StringDecoder,
StringDecoder] (ssc, kafpar, topicMap,
StorageLevel.MEMORY_ONLY_SER).map(_._2).cache()

   val ll_keycamps = List ( List("srcaddr", "dstaddr")
                            ,List("dstaddr")
                            ,List("srcaddr")
                            ,List("srcport")
                            ,List("dstport")
                            ,List("total")
                          )

   val l_valcamps = List ("doctets"
                         ,"dpkts"
                         ,"flows"
                          )

   for (keycamps <- ll_keycamps) {

         val keycamps_str = keycamps.mkString("-")
         val csvheader = keycamps.mkString(",") + ",amount"

         for (valcamp <- l_valcamps) {

              val lines2 = lines.map( getKeyVal (_, keycamps, valcamp )
).cache()

              lines2.reduceByKeyAndWindow((a:Long,b:Long)=>a+b, Seconds(60),
Seconds(10))
                      .map(_.swap)
                      .transform(_.sortByKey(false))
                      .foreachRDD(rdd => {
                             val data = rdd.take(20)
                             writeOutput (data, keycamps_str, csvheader,
valcamp, "DATAWINDOW")
                     })


              lines2.reduceByKey((a:Long,b:Long)=>a+b)
                      .map(_.swap)
                      .transform(_.sortByKey(false))
                      .foreachRDD(rdd => {
                             val data = rdd.take(20)
                             writeOutput (data, keycamps_str, csvheader,
valcamp, "DATA")
                     })

         }
    }

    ssc.start()
    ssc.awaitTermination()

  }
}






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-problems-running-24x7-tp22518.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org>
For additional commands, e-mail: 
user-h...@spark.apache.org<mailto:user-h...@spark.apache.org>

Reply via email to