Re: Streaming problems running 24x7

2015-04-21 Thread Conor Fennell
Hi,

If the slow memory increase is in the driver, it could be related to this:
https://issues.apache.org/jira/browse/SPARK-5967

*"After some hours disk space is being consumed. There are a lot of*

*directories with name like
"/tmp/spark-e3505437-f509-4b5b-92d2-ae2559badb3c"*


Spark doesn't automatically clean up those shuffle directories and in my
experience led to " (java.lang.Exception: Could not compute split, block

input-0-1429168311800 not found) "


One approach is to clean them up manually with a cron job.

Another is to set spark.cleaner.ttl to some reasonable time and also
set spark.streaming.unpersist=true.


Those together cleaned up the shuffle files for us.


-Conor




On Tue, Apr 21, 2015 at 11:27 AM, González Salgado, Miquel <
miquel.gonza...@tecsidel.es> wrote:

> thank you Luis,
>
> I have tried without the window operation, but the memory leak is still
> present...
>
> I think it must be something related to spark, running some exemple as
> TwitterPopularTags, it happens the same.
>
>
>
> I will post something if I found a solution
>
>
>
>
>
> *De:* Luis Ángel Vicente Sánchez [mailto:langel.gro...@gmail.com]
> *Enviado el:* lunes, 20 de abril de 2015 15:46
> *Para:* Marius Soutier
> *CC:* González Salgado, Miquel; bit1...@163.com; user
> *Asunto:* Re: Streaming problems running 24x7
>
>
>
> You have a window operation; I have seen that behaviour before with window
> operations in spark streaming. My solution was to move away from window
> operations using probabilistic data structures; it might not be an option
> for you.
>
>
>
> 2015-04-20 10:29 GMT+01:00 Marius Soutier :
>
> The processing speed displayed in the UI doesn’t seem to take everything
> into account. I also had a low processing time but had to increase batch
> duration from 30 seconds to 1 minute because waiting batches kept
> increasing. Now it runs fine.
>
>
>
> On 17.04.2015, at 13:30, González Salgado, Miquel <
> miquel.gonza...@tecsidel.es> wrote:
>
>
>
> Hi,
>
>
>
> Thank you for your response,
>
> I think it is not because of the processing speed, in fact the delay is
> under 1 second, while the batch interval is 10 seconds… The data volume is
> low (10 lines / second)
>
>
>
> Changing to local[8] was worsening the problem (cpu increase more quickly)
>
>
>
> By the way, I have seen some results changing to this call of Kafkautils:
>
>
> KafkaUtils.createDirectStream
>
>
>
> CPU usage is low and stable, but memory is slowly increasing… But at least
> the process last longer..
>
>
>
> Best regards,
>
> Miquel
>
>
>
>
>
> *De:* bit1...@163.com [mailto:bit1...@163.com ]
> *Enviado el:* jueves, 16 de abril de 2015 10:58
> *Para:* González Salgado, Miquel; user
> *Asunto:* Re: Streaming problems running 24x7
>
>
>
> From your description, looks like the data processing speed is far behind
> the data receiving speed
>
>
>
> Could you try to increase the core number when you submit the application?
> such as local[8]?
>
>
> --
>
> bit1...@163.com
>
>
>
> *From:* Miquel 
>
> *Date:* 2015-04-16 16:39
>
> *To:* user 
>
> *Subject:* Streaming problems running 24x7
>
> Hello,
>
> I'm finding problems to run a spark streaming job for more than a few hours
>
> (3 or 4). It begins working OK, but it degrades until failure. Some of the
>
> symptoms:
>
>
>
> - Consumed memory and CPU keeps getting higher ang higher, and finally some
>
> error is being thrown (java.lang.Exception: Could not compute split, block
>
> input-0-1429168311800 not found) and data stops being calculated.
>
>
>
> - The delay showed in web UI keeps also increasing.
>
>
>
> - After some hours disk space is being consumed. There are a lot of
>
> directories with name like
> "/tmp/spark-e3505437-f509-4b5b-92d2-ae2559badb3c"
>
>
>
> The job is basically reading information from kafka topic, and calculate
>
> several topN tables for some key and value camps related with netflow data,
>
> some of the parameters are this:
>
> - batch interval: 10 seconds
>
> - window calculation: 1 minute
>
> - spark.cleaner.ttl: 5 minutes
>
>
>
> The execution is standalone on one machine (16GB RAM , 12 cores), and the
>
> options to run it is as follows:
>
> /opt/spark/bin/spark-submit --driver-java-options "-XX:+UseCompressedOops"
>
> --jars $JARS --class $APPCLASS --master local[2] $APPJAR
>
>
>
> someone has some clues about the problem? I don't know if it is a
>
> configuration pro

RE: Streaming problems running 24x7

2015-04-21 Thread González Salgado , Miquel
thank you Luis,
I have tried without the window operation, but the memory leak is still 
present...
I think it must be something related to spark, running some exemple as 
TwitterPopularTags, it happens the same.

I will post something if I found a solution


De: Luis Ángel Vicente Sánchez [mailto:langel.gro...@gmail.com]
Enviado el: lunes, 20 de abril de 2015 15:46
Para: Marius Soutier
CC: González Salgado, Miquel; bit1...@163.com; user
Asunto: Re: Streaming problems running 24x7

You have a window operation; I have seen that behaviour before with window 
operations in spark streaming. My solution was to move away from window 
operations using probabilistic data structures; it might not be an option for 
you.

2015-04-20 10:29 GMT+01:00 Marius Soutier 
mailto:mps@gmail.com>>:
The processing speed displayed in the UI doesn’t seem to take everything into 
account. I also had a low processing time but had to increase batch duration 
from 30 seconds to 1 minute because waiting batches kept increasing. Now it 
runs fine.

On 17.04.2015, at 13:30, González Salgado, Miquel 
mailto:miquel.gonza...@tecsidel.es>> wrote:

Hi,

Thank you for your response,
I think it is not because of the processing speed, in fact the delay is under 1 
second, while the batch interval is 10 seconds… The data volume is low (10 
lines / second)

Changing to local[8] was worsening the problem (cpu increase more quickly)

By the way, I have seen some results changing to this call of Kafkautils:
KafkaUtils.createDirectStream

CPU usage is low and stable, but memory is slowly increasing… But at least the 
process last longer..

Best regards,
Miquel


De: bit1...@163.com<mailto:bit1...@163.com> [mailto:bit1...@163.com]
Enviado el: jueves, 16 de abril de 2015 10:58
Para: González Salgado, Miquel; user
Asunto: Re: Streaming problems running 24x7

From your description, looks like the data processing speed is far behind the 
data receiving speed

Could you try to increase the core number when you submit the application? such 
as local[8]?


bit1...@163.com<mailto:bit1...@163.com>

From: Miquel<mailto:miquel.gonza...@tecsidel.es>
Date: 2015-04-16 16:39
To: user<mailto:user@spark.apache.org>
Subject: Streaming problems running 24x7
Hello,
I'm finding problems to run a spark streaming job for more than a few hours
(3 or 4). It begins working OK, but it degrades until failure. Some of the
symptoms:

- Consumed memory and CPU keeps getting higher ang higher, and finally some
error is being thrown (java.lang.Exception: Could not compute split, block
input-0-1429168311800 not found) and data stops being calculated.

- The delay showed in web UI keeps also increasing.

- After some hours disk space is being consumed. There are a lot of
directories with name like "/tmp/spark-e3505437-f509-4b5b-92d2-ae2559badb3c"

The job is basically reading information from kafka topic, and calculate
several topN tables for some key and value camps related with netflow data,
some of the parameters are this:
- batch interval: 10 seconds
- window calculation: 1 minute
- spark.cleaner.ttl: 5 minutes

The execution is standalone on one machine (16GB RAM , 12 cores), and the
options to run it is as follows:
/opt/spark/bin/spark-submit --driver-java-options "-XX:+UseCompressedOops"
--jars $JARS --class $APPCLASS --master local[2] $APPJAR

someone has some clues about the problem? I don't know if it is a
configuration problem or some error in the code that is causing memory
leaks..

Thank you in advance!
Miquel

PD: the code is basically this:--

object NetflowTopn {

  var appPath = "."
  var zkQuorum = ""
  var group = ""
  var topics = ""
  var numThreads = 1

  var batch_interval = 10
  var n_window = 1
  var n_slide = 1
  var topnsize = 10

  var hm = Map[String,Int]()
  hm += ( "unix_secs" ->  0 )
  hm += ( "unix_nsecs" -> 1 )
  hm += ( "sysuptime" ->  2 )
  hm += ( "exaddr" -> 3 )
  hm += ( "dpkts" ->  4 )
  hm += ( "doctets" ->5 )
  hm += ( "first" ->  6 )
  hm += ( "last" ->   7 )
  hm += ( "engine_type" -> 8 )
  hm += ( "engine_id" ->   9 )
  hm += ( "srcaddr" ->10 )
  hm += ( "dstaddr" ->11 )
  hm += ( "nexthop" ->12 )
  hm += ( "input" ->  13 )
  hm += ( "output" -> 14 )
  hm += ( "srcport" ->15 )
  hm += ( "dstport" ->16 )
  hm += ( "prot" ->   17 )
  hm += ( "tos" ->18 )
  hm += ( "tcp_flags" ->  19 )
  hm += ( "src_mask" ->   20 )
  hm += ( "dst_mask" ->   21 )
  hm += ( "src_as" -> 22 )
  hm += ( "dst_as" -> 

Re: Streaming problems running 24x7

2015-04-20 Thread Luis Ángel Vicente Sánchez
You have a window operation; I have seen that behaviour before with window
operations in spark streaming. My solution was to move away from window
operations using probabilistic data structures; it might not be an option
for you.

2015-04-20 10:29 GMT+01:00 Marius Soutier :

> The processing speed displayed in the UI doesn’t seem to take everything
> into account. I also had a low processing time but had to increase batch
> duration from 30 seconds to 1 minute because waiting batches kept
> increasing. Now it runs fine.
>
> On 17.04.2015, at 13:30, González Salgado, Miquel <
> miquel.gonza...@tecsidel.es> wrote:
>
> Hi,
>
> Thank you for your response,
> I think it is not because of the processing speed, in fact the delay is
> under 1 second, while the batch interval is 10 seconds… The data volume is
> low (10 lines / second)
>
> Changing to local[8] was worsening the problem (cpu increase more quickly)
>
> By the way, I have seen some results changing to this call of Kafkautils:
>
> KafkaUtils.createDirectStream
>
> CPU usage is low and stable, but memory is slowly increasing… But at least
> the process last longer..
>
> Best regards,
> Miquel
>
>
> *De:* bit1...@163.com [mailto:bit1...@163.com ]
> *Enviado el:* jueves, 16 de abril de 2015 10:58
> *Para:* González Salgado, Miquel; user
> *Asunto:* Re: Streaming problems running 24x7
>
> From your description, looks like the data processing speed is far behind
> the data receiving speed
>
> Could you try to increase the core number when you submit the application?
> such as local[8]?
>
> ------
> bit1...@163.com
>
>
> *From:* Miquel 
> *Date:* 2015-04-16 16:39
> *To:* user 
> *Subject:* Streaming problems running 24x7
> Hello,
> I'm finding problems to run a spark streaming job for more than a few hours
> (3 or 4). It begins working OK, but it degrades until failure. Some of the
> symptoms:
>
> - Consumed memory and CPU keeps getting higher ang higher, and finally some
> error is being thrown (java.lang.Exception: Could not compute split, block
> input-0-1429168311800 not found) and data stops being calculated.
>
> - The delay showed in web UI keeps also increasing.
>
> - After some hours disk space is being consumed. There are a lot of
> directories with name like
> "/tmp/spark-e3505437-f509-4b5b-92d2-ae2559badb3c"
>
> The job is basically reading information from kafka topic, and calculate
> several topN tables for some key and value camps related with netflow data,
> some of the parameters are this:
> - batch interval: 10 seconds
> - window calculation: 1 minute
> - spark.cleaner.ttl: 5 minutes
>
> The execution is standalone on one machine (16GB RAM , 12 cores), and the
> options to run it is as follows:
> /opt/spark/bin/spark-submit --driver-java-options "-XX:+UseCompressedOops"
> --jars $JARS --class $APPCLASS --master local[2] $APPJAR
>
> someone has some clues about the problem? I don't know if it is a
> configuration problem or some error in the code that is causing memory
> leaks..
>
> Thank you in advance!
> Miquel
>
> PD: the code is basically this:--
>
> object NetflowTopn {
>
>   var appPath = "."
>   var zkQuorum = ""
>   var group = ""
>   var topics = ""
>   var numThreads = 1
>
>   var batch_interval = 10
>   var n_window = 1
>   var n_slide = 1
>   var topnsize = 10
>
>   var hm = Map[String,Int]()
>   hm += ( "unix_secs" ->  0 )
>   hm += ( "unix_nsecs" -> 1 )
>   hm += ( "sysuptime" ->  2 )
>   hm += ( "exaddr" -> 3 )
>   hm += ( "dpkts" ->  4 )
>   hm += ( "doctets" ->5 )
>   hm += ( "first" ->  6 )
>   hm += ( "last" ->   7 )
>   hm += ( "engine_type" -> 8 )
>   hm += ( "engine_id" ->   9 )
>   hm += ( "srcaddr" ->10 )
>   hm += ( "dstaddr" ->11 )
>   hm += ( "nexthop" ->12 )
>   hm += ( "input" ->  13 )
>   hm += ( "output" -> 14 )
>   hm += ( "srcport" ->15 )
>   hm += ( "dstport" ->16 )
>   hm += ( "prot" ->   17 )
>   hm += ( "tos" ->18 )
>   hm += ( "tcp_flags" ->  19 )
>   hm += ( "src_mask" ->   20 )
>   hm += ( "dst_mask" ->   21 )
>   hm += ( "src_as" -> 22 )
>   hm += ( "dst_as" -> 23 )
>
>   def getKey (lcamps: Array[String], camp: String): Str

Re: Streaming problems running 24x7

2015-04-20 Thread Marius Soutier
The processing speed displayed in the UI doesn’t seem to take everything into 
account. I also had a low processing time but had to increase batch duration 
from 30 seconds to 1 minute because waiting batches kept increasing. Now it 
runs fine.

> On 17.04.2015, at 13:30, González Salgado, Miquel 
>  wrote:
> 
> Hi,
>  
> Thank you for your response, 
> I think it is not because of the processing speed, in fact the delay is under 
> 1 second, while the batch interval is 10 seconds… The data volume is low (10 
> lines / second)
>  
> Changing to local[8] was worsening the problem (cpu increase more quickly)
>  
> By the way, I have seen some results changing to this call of Kafkautils: 
> 
> KafkaUtils.createDirectStream
>  
> CPU usage is low and stable, but memory is slowly increasing… But at least 
> the process last longer..
>  
> Best regards, 
> Miquel
>  
>  
> De: bit1...@163.com <mailto:bit1...@163.com> [mailto:bit1...@163.com 
> <mailto:bit1...@163.com>] 
> Enviado el: jueves, 16 de abril de 2015 10:58
> Para: González Salgado, Miquel; user
> Asunto: Re: Streaming problems running 24x7
>  
> From your description, looks like the data processing speed is far behind the 
> data receiving speed
>  
> Could you try to increase the core number when you submit the application? 
> such as local[8]?
>  
> bit1...@163.com <mailto:bit1...@163.com>
>  
> From: Miquel <mailto:miquel.gonza...@tecsidel.es>
> Date: 2015-04-16 16:39
> To: user <mailto:user@spark.apache.org>
> Subject: Streaming problems running 24x7
> Hello,
> I'm finding problems to run a spark streaming job for more than a few hours
> (3 or 4). It begins working OK, but it degrades until failure. Some of the
> symptoms:
>  
> - Consumed memory and CPU keeps getting higher ang higher, and finally some
> error is being thrown (java.lang.Exception: Could not compute split, block
> input-0-1429168311800 not found) and data stops being calculated. 
>  
> - The delay showed in web UI keeps also increasing.
>  
> - After some hours disk space is being consumed. There are a lot of
> directories with name like "/tmp/spark-e3505437-f509-4b5b-92d2-ae2559badb3c"
>  
> The job is basically reading information from kafka topic, and calculate
> several topN tables for some key and value camps related with netflow data,
> some of the parameters are this:
> - batch interval: 10 seconds
> - window calculation: 1 minute
> - spark.cleaner.ttl: 5 minutes
>  
> The execution is standalone on one machine (16GB RAM , 12 cores), and the
> options to run it is as follows:
> /opt/spark/bin/spark-submit --driver-java-options "-XX:+UseCompressedOops"
> --jars $JARS --class $APPCLASS --master local[2] $APPJAR 
>  
> someone has some clues about the problem? I don't know if it is a
> configuration problem or some error in the code that is causing memory
> leaks..
>  
> Thank you in advance!
> Miquel
>  
> PD: the code is basically this:--
>  
> object NetflowTopn {
>
>   var appPath = "."
>   var zkQuorum = ""
>   var group = ""
>   var topics = ""
>   var numThreads = 1
>  
>   var batch_interval = 10
>   var n_window = 1
>   var n_slide = 1
>   var topnsize = 10
>  
>   var hm = Map[String,Int]()
>   hm += ( "unix_secs" ->  0 )
>   hm += ( "unix_nsecs" -> 1 )
>   hm += ( "sysuptime" ->  2 )
>   hm += ( "exaddr" -> 3 )
>   hm += ( "dpkts" ->  4 )
>   hm += ( "doctets" ->5 )
>   hm += ( "first" ->  6 )
>   hm += ( "last" ->   7 )
>   hm += ( "engine_type" -> 8 )
>   hm += ( "engine_id" ->   9 )
>   hm += ( "srcaddr" ->10 )
>   hm += ( "dstaddr" ->11 )
>   hm += ( "nexthop" ->12 )
>   hm += ( "input" ->  13 )
>   hm += ( "output" -> 14 )
>   hm += ( "srcport" ->15 )
>   hm += ( "dstport" ->16 )
>   hm += ( "prot" ->   17 )
>   hm += ( "tos" ->18 )
>   hm += ( "tcp_flags" ->  19 )
>   hm += ( "src_mask" ->   20 )
>   hm += ( "dst_mask" ->   21 )
>   hm += ( "src_as" -> 22 )
>   hm += ( "dst_as" -> 23 )
> 
>   def getKey (lcamps: Array[String], camp: String): String  = {
> if (camp == "total") return "total"
> else return lcamps(hm(camp))
>   }
>  

RE: Streaming problems running 24x7

2015-04-17 Thread González Salgado , Miquel
Hi,

Thank you for your response,
I think it is not because of the processing speed, in fact the delay is under 1 
second, while the batch interval is 10 seconds... The data volume is low (10 
lines / second)

Changing to local[8] was worsening the problem (cpu increase more quickly)

By the way, I have seen some results changing to this call of Kafkautils:
KafkaUtils.createDirectStream

CPU usage is low and stable, but memory is slowly increasing... But at least 
the process last longer..

Best regards,
Miquel


De: bit1...@163.com [mailto:bit1...@163.com]
Enviado el: jueves, 16 de abril de 2015 10:58
Para: González Salgado, Miquel; user
Asunto: Re: Streaming problems running 24x7

>From your description, looks like the data processing speed is far behind the 
>data receiving speed

Could you try to increase the core number when you submit the application? such 
as local[8]?


bit1...@163.com<mailto:bit1...@163.com>

From: Miquel<mailto:miquel.gonza...@tecsidel.es>
Date: 2015-04-16 16:39
To: user<mailto:user@spark.apache.org>
Subject: Streaming problems running 24x7
Hello,
I'm finding problems to run a spark streaming job for more than a few hours
(3 or 4). It begins working OK, but it degrades until failure. Some of the
symptoms:

- Consumed memory and CPU keeps getting higher ang higher, and finally some
error is being thrown (java.lang.Exception: Could not compute split, block
input-0-1429168311800 not found) and data stops being calculated.

- The delay showed in web UI keeps also increasing.

- After some hours disk space is being consumed. There are a lot of
directories with name like "/tmp/spark-e3505437-f509-4b5b-92d2-ae2559badb3c"

The job is basically reading information from kafka topic, and calculate
several topN tables for some key and value camps related with netflow data,
some of the parameters are this:
- batch interval: 10 seconds
- window calculation: 1 minute
- spark.cleaner.ttl: 5 minutes

The execution is standalone on one machine (16GB RAM , 12 cores), and the
options to run it is as follows:
/opt/spark/bin/spark-submit --driver-java-options "-XX:+UseCompressedOops"
--jars $JARS --class $APPCLASS --master local[2] $APPJAR

someone has some clues about the problem? I don't know if it is a
configuration problem or some error in the code that is causing memory
leaks..

Thank you in advance!
Miquel

PD: the code is basically this:--

object NetflowTopn {

  var appPath = "."
  var zkQuorum = ""
  var group = ""
  var topics = ""
  var numThreads = 1

  var batch_interval = 10
  var n_window = 1
  var n_slide = 1
  var topnsize = 10

  var hm = Map[String,Int]()
  hm += ( "unix_secs" ->  0 )
  hm += ( "unix_nsecs" -> 1 )
  hm += ( "sysuptime" ->  2 )
  hm += ( "exaddr" -> 3 )
  hm += ( "dpkts" ->  4 )
  hm += ( "doctets" ->5 )
  hm += ( "first" ->  6 )
  hm += ( "last" ->   7 )
  hm += ( "engine_type" -> 8 )
  hm += ( "engine_id" ->   9 )
  hm += ( "srcaddr" ->10 )
  hm += ( "dstaddr" ->11 )
  hm += ( "nexthop" ->12 )
  hm += ( "input" ->  13 )
  hm += ( "output" -> 14 )
  hm += ( "srcport" ->15 )
  hm += ( "dstport" ->16 )
  hm += ( "prot" ->   17 )
  hm += ( "tos" ->18 )
  hm += ( "tcp_flags" ->  19 )
  hm += ( "src_mask" ->   20 )
  hm += ( "dst_mask" ->   21 )
  hm += ( "src_as" -> 22 )
  hm += ( "dst_as" -> 23 )

  def getKey (lcamps: Array[String], camp: String): String  = {
if (camp == "total") return "total"
else return lcamps(hm(camp))
  }

  def getVal (lcamps: Array[String], camp: String): Long  = {
if (camp == "flows") return 1L
else return lcamps(hm(camp)).toLong
  }

  def getKeyVal (line: String, keycamps: List[String], valcamp: String ) = {
val arr = line.split(",")
(keycamps.map(getKey(arr, _)).mkString(",")   ,   getVal(arr,valcamp) )
  }

  def writeOutput (data: Array[(Long, String)], keycamps_str: String,
csvheader: String, valcamp: String, prefix: String) = {

   val ts = System.currentTimeMillis
   val f1 = appPath + "/data/" + prefix + "_" + keycamps_str + "_" +
valcamp + ".csv"
   val f1f = new File(f1);
   val ftmpf = new File(f1 + ts);
   val pw = new PrintWriter(ftmpf)
   pw.println(csvheader)
   data.foreach{
t =>  pw.println (t._2 + "," + t._1)
   }
   pw.close
   ftmpf.renameTo(f1f);

  }


  def main(

RE: Streaming problems running 24x7

2015-04-17 Thread González Salgado , Miquel
Hi Akhil,

Thank you for your response,
I think it is not because of the processing time, in fact the delay is under 1 
second, while the batch interval is 10 seconds… The data volume is low (10 
lines / second)

By the way, I have seen some results changing to this call of Kafkautils:
KafkaUtils.createDirectStream

CPU usage is low and stable, but memory is slowly increasing… But at least the 
process last longer..

Best regards,
Miquel


De: Akhil Das [mailto:ak...@sigmoidanalytics.com]
Enviado el: jueves, 16 de abril de 2015 12:07
Para: González Salgado, Miquel
CC: user@spark.apache.org
Asunto: Re: Streaming problems running 24x7


I used to hit this issue when my processing time exceeds the batch duration. 
Here's a few workarounds:

- Use storage level MEMORY_AND_DISK
- Enable WAL and check pointing

Above two will slow down things a little bit.

If you want low latency, what you can try is:

- Use storage level as MEMORY_ONLY_2 ( Atleast replicates it)

- Tachyon based off heap for storage (havent tried this, but will let you know)

And from spark 1.3.1 version, they have purged the old WAL and it has better 
performance. You could try that also.
On 16 Apr 2015 14:10, "Miquel" 
mailto:miquel.gonza...@tecsidel.es>> wrote:
Hello,
I'm finding problems to run a spark streaming job for more than a few hours
(3 or 4). It begins working OK, but it degrades until failure. Some of the
symptoms:

- Consumed memory and CPU keeps getting higher ang higher, and finally some
error is being thrown (java.lang.Exception: Could not compute split, block
input-0-1429168311800 not found) and data stops being calculated.

- The delay showed in web UI keeps also increasing.

- After some hours disk space is being consumed. There are a lot of
directories with name like "/tmp/spark-e3505437-f509-4b5b-92d2-ae2559badb3c"

The job is basically reading information from kafka topic, and calculate
several topN tables for some key and value camps related with netflow data,
some of the parameters are this:
- batch interval: 10 seconds
- window calculation: 1 minute
- spark.cleaner.ttl: 5 minutes

The execution is standalone on one machine (16GB RAM , 12 cores), and the
options to run it is as follows:
/opt/spark/bin/spark-submit --driver-java-options "-XX:+UseCompressedOops"
--jars $JARS --class $APPCLASS --master local[2] $APPJAR

someone has some clues about the problem? I don't know if it is a
configuration problem or some error in the code that is causing memory
leaks..

Thank you in advance!
Miquel

PD: the code is basically this:--

object NetflowTopn {

  var appPath = "."
  var zkQuorum = ""
  var group = ""
  var topics = ""
  var numThreads = 1

  var batch_interval = 10
  var n_window = 1
  var n_slide = 1
  var topnsize = 10

  var hm = Map[String,Int]()
  hm += ( "unix_secs" ->  0 )
  hm += ( "unix_nsecs" -> 1 )
  hm += ( "sysuptime" ->  2 )
  hm += ( "exaddr" -> 3 )
  hm += ( "dpkts" ->  4 )
  hm += ( "doctets" ->5 )
  hm += ( "first" ->  6 )
  hm += ( "last" ->   7 )
  hm += ( "engine_type" -> 8 )
  hm += ( "engine_id" ->   9 )
  hm += ( "srcaddr" ->10 )
  hm += ( "dstaddr" ->11 )
  hm += ( "nexthop" ->12 )
  hm += ( "input" ->  13 )
  hm += ( "output" -> 14 )
  hm += ( "srcport" ->15 )
  hm += ( "dstport" ->16 )
  hm += ( "prot" ->   17 )
  hm += ( "tos" ->18 )
  hm += ( "tcp_flags" ->  19 )
  hm += ( "src_mask" ->   20 )
  hm += ( "dst_mask" ->   21 )
  hm += ( "src_as" -> 22 )
  hm += ( "dst_as" -> 23 )

  def getKey (lcamps: Array[String], camp: String): String  = {
if (camp == "total") return "total"
else return lcamps(hm(camp))
  }

  def getVal (lcamps: Array[String], camp: String): Long  = {
if (camp == "flows") return 1L
else return lcamps(hm(camp)).toLong
  }

  def getKeyVal (line: String, keycamps: List[String], valcamp: String ) = {
val arr = line.split(",")
(keycamps.map(getKey(arr, _)).mkString(",")   ,   getVal(arr,valcamp) )
  }

  def writeOutput (data: Array[(Long, String)], keycamps_str: String,
csvheader: String, valcamp: String, prefix: String) = {

   val ts = System.currentTimeMillis
   val f1 = appPath + "/data/" + prefix + "_" + keycamps_str + "_" +
valcamp + ".csv"
   val f1f = new File(f1);
   val ftmpf = new File(f1 + ts);
   val pw = new PrintWriter(ftmpf)
   pw.println(csvheader)
   data.foreach{
t =&g

Re: Streaming problems running 24x7

2015-04-16 Thread Akhil Das
n(args: Array[String]) {
>
> if (args.length < 1) {
>   System.err.println("Usage: NetflowTopn ")
>   System.exit(1)
> }
>
> appPath = args(0)
>
> try {
>val prop = new Properties()
>prop.load(new FileInputStream(appPath + "/conf/app.properties"))
>
>zkQuorum =prop.getProperty("KAFKA_HOST")
>group =   prop.getProperty("KAFKA_GROUP")
>topics =  prop.getProperty("KAFKA_TOPIC")
>numThreads =  prop.getProperty("THREADS").toInt
>
> } catch { case e: Exception =>
>e.printStackTrace()
>sys.exit(1)
> }
>
> val sparkConf = new SparkConf().setAppName("netflow-topn")
>.set("spark.default.parallelism", "2")
>.set("spark.rdd.compress", "true")
>.set("spark.streaming.unpersist",
> "true")
>.set("spark.cleaner.ttl", "300")
>
> val ssc =  new StreamingContext(sparkConf, Seconds(batch_interval))
>
> val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
>
> val kafpar = Map[String, String](
>   "zookeeper.connect" -> zkQuorum,
>   "group.id" -> group,
>   "zookeeper.connection.timeout.ms" -> "5000",
>   "auto.commit.interval.ms" -> "6",
>   "auto.offset.reset" -> "largest"
> )
>
> val lines = KafkaUtils.createStream[String, String, StringDecoder,
> StringDecoder] (ssc, kafpar, topicMap,
> StorageLevel.MEMORY_ONLY_SER).map(_._2).cache()
>
>val ll_keycamps = List ( List("srcaddr", "dstaddr")
> ,List("dstaddr")
> ,List("srcaddr")
> ,List("srcport")
> ,List("dstport")
> ,List("total")
>   )
>
>val l_valcamps = List ("doctets"
>  ,"dpkts"
>  ,"flows"
>   )
>
>for (keycamps <- ll_keycamps) {
>
>  val keycamps_str = keycamps.mkString("-")
>  val csvheader = keycamps.mkString(",") + ",amount"
>
>  for (valcamp <- l_valcamps) {
>
>   val lines2 = lines.map( getKeyVal (_, keycamps, valcamp )
> ).cache()
>
>   lines2.reduceByKeyAndWindow((a:Long,b:Long)=>a+b,
> Seconds(60),
> Seconds(10))
>   .map(_.swap)
>   .transform(_.sortByKey(false))
>   .foreachRDD(rdd => {
>  val data = rdd.take(20)
>  writeOutput (data, keycamps_str, csvheader,
> valcamp, "DATAWINDOW")
>  })
>
>
>   lines2.reduceByKey((a:Long,b:Long)=>a+b)
>   .map(_.swap)
>   .transform(_.sortByKey(false))
>   .foreachRDD(rdd => {
>  val data = rdd.take(20)
>  writeOutput (data, keycamps_str, csvheader,
> valcamp, "DATA")
>  })
>
>  }
> }
>
> ssc.start()
> ssc.awaitTermination()
>
>   }
> }
>
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-problems-running-24x7-tp22518.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Streaming problems running 24x7

2015-04-16 Thread Miquel
, "300")
   
val ssc =  new StreamingContext(sparkConf, Seconds(batch_interval))

val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
   
val kafpar = Map[String, String](
  "zookeeper.connect" -> zkQuorum,
  "group.id" -> group,
  "zookeeper.connection.timeout.ms" -> "5000",
  "auto.commit.interval.ms" -> "6",
  "auto.offset.reset" -> "largest"
)

val lines = KafkaUtils.createStream[String, String, StringDecoder,
StringDecoder] (ssc, kafpar, topicMap,
StorageLevel.MEMORY_ONLY_SER).map(_._2).cache()

   val ll_keycamps = List ( List("srcaddr", "dstaddr")
,List("dstaddr")
,List("srcaddr")
,List("srcport")
,List("dstport")
,List("total")
  )

   val l_valcamps = List ("doctets"
 ,"dpkts"
 ,"flows"
  )

   for (keycamps <- ll_keycamps) {

 val keycamps_str = keycamps.mkString("-")
 val csvheader = keycamps.mkString(",") + ",amount"

 for (valcamp <- l_valcamps) { 

  val lines2 = lines.map( getKeyVal (_, keycamps, valcamp )
).cache()

  lines2.reduceByKeyAndWindow((a:Long,b:Long)=>a+b, Seconds(60),
Seconds(10))
  .map(_.swap)
  .transform(_.sortByKey(false))
  .foreachRDD(rdd => {
 val data = rdd.take(20) 
 writeOutput (data, keycamps_str, csvheader,
valcamp, "DATAWINDOW")
 })


  lines2.reduceByKey((a:Long,b:Long)=>a+b)
  .map(_.swap)
  .transform(_.sortByKey(false))
  .foreachRDD(rdd => {
 val data = rdd.take(20) 
 writeOutput (data, keycamps_str, csvheader,
valcamp, "DATA")
 })

 }
}
   
ssc.start()
ssc.awaitTermination()

  }
}






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-problems-running-24x7-tp22518.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org