Re: Streaming problems running 24x7
Hi, If the slow memory increase is in the driver, it could be related to this: https://issues.apache.org/jira/browse/SPARK-5967 *"After some hours disk space is being consumed. There are a lot of* *directories with name like "/tmp/spark-e3505437-f509-4b5b-92d2-ae2559badb3c"* Spark doesn't automatically clean up those shuffle directories and in my experience led to " (java.lang.Exception: Could not compute split, block input-0-1429168311800 not found) " One approach is to clean them up manually with a cron job. Another is to set spark.cleaner.ttl to some reasonable time and also set spark.streaming.unpersist=true. Those together cleaned up the shuffle files for us. -Conor On Tue, Apr 21, 2015 at 11:27 AM, González Salgado, Miquel < miquel.gonza...@tecsidel.es> wrote: > thank you Luis, > > I have tried without the window operation, but the memory leak is still > present... > > I think it must be something related to spark, running some exemple as > TwitterPopularTags, it happens the same. > > > > I will post something if I found a solution > > > > > > *De:* Luis Ángel Vicente Sánchez [mailto:langel.gro...@gmail.com] > *Enviado el:* lunes, 20 de abril de 2015 15:46 > *Para:* Marius Soutier > *CC:* González Salgado, Miquel; bit1...@163.com; user > *Asunto:* Re: Streaming problems running 24x7 > > > > You have a window operation; I have seen that behaviour before with window > operations in spark streaming. My solution was to move away from window > operations using probabilistic data structures; it might not be an option > for you. > > > > 2015-04-20 10:29 GMT+01:00 Marius Soutier : > > The processing speed displayed in the UI doesn’t seem to take everything > into account. I also had a low processing time but had to increase batch > duration from 30 seconds to 1 minute because waiting batches kept > increasing. Now it runs fine. > > > > On 17.04.2015, at 13:30, González Salgado, Miquel < > miquel.gonza...@tecsidel.es> wrote: > > > > Hi, > > > > Thank you for your response, > > I think it is not because of the processing speed, in fact the delay is > under 1 second, while the batch interval is 10 seconds… The data volume is > low (10 lines / second) > > > > Changing to local[8] was worsening the problem (cpu increase more quickly) > > > > By the way, I have seen some results changing to this call of Kafkautils: > > > KafkaUtils.createDirectStream > > > > CPU usage is low and stable, but memory is slowly increasing… But at least > the process last longer.. > > > > Best regards, > > Miquel > > > > > > *De:* bit1...@163.com [mailto:bit1...@163.com ] > *Enviado el:* jueves, 16 de abril de 2015 10:58 > *Para:* González Salgado, Miquel; user > *Asunto:* Re: Streaming problems running 24x7 > > > > From your description, looks like the data processing speed is far behind > the data receiving speed > > > > Could you try to increase the core number when you submit the application? > such as local[8]? > > > -- > > bit1...@163.com > > > > *From:* Miquel > > *Date:* 2015-04-16 16:39 > > *To:* user > > *Subject:* Streaming problems running 24x7 > > Hello, > > I'm finding problems to run a spark streaming job for more than a few hours > > (3 or 4). It begins working OK, but it degrades until failure. Some of the > > symptoms: > > > > - Consumed memory and CPU keeps getting higher ang higher, and finally some > > error is being thrown (java.lang.Exception: Could not compute split, block > > input-0-1429168311800 not found) and data stops being calculated. > > > > - The delay showed in web UI keeps also increasing. > > > > - After some hours disk space is being consumed. There are a lot of > > directories with name like > "/tmp/spark-e3505437-f509-4b5b-92d2-ae2559badb3c" > > > > The job is basically reading information from kafka topic, and calculate > > several topN tables for some key and value camps related with netflow data, > > some of the parameters are this: > > - batch interval: 10 seconds > > - window calculation: 1 minute > > - spark.cleaner.ttl: 5 minutes > > > > The execution is standalone on one machine (16GB RAM , 12 cores), and the > > options to run it is as follows: > > /opt/spark/bin/spark-submit --driver-java-options "-XX:+UseCompressedOops" > > --jars $JARS --class $APPCLASS --master local[2] $APPJAR > > > > someone has some clues about the problem? I don't know if it is a > > configuration pro
RE: Streaming problems running 24x7
thank you Luis, I have tried without the window operation, but the memory leak is still present... I think it must be something related to spark, running some exemple as TwitterPopularTags, it happens the same. I will post something if I found a solution De: Luis Ángel Vicente Sánchez [mailto:langel.gro...@gmail.com] Enviado el: lunes, 20 de abril de 2015 15:46 Para: Marius Soutier CC: González Salgado, Miquel; bit1...@163.com; user Asunto: Re: Streaming problems running 24x7 You have a window operation; I have seen that behaviour before with window operations in spark streaming. My solution was to move away from window operations using probabilistic data structures; it might not be an option for you. 2015-04-20 10:29 GMT+01:00 Marius Soutier mailto:mps@gmail.com>>: The processing speed displayed in the UI doesn’t seem to take everything into account. I also had a low processing time but had to increase batch duration from 30 seconds to 1 minute because waiting batches kept increasing. Now it runs fine. On 17.04.2015, at 13:30, González Salgado, Miquel mailto:miquel.gonza...@tecsidel.es>> wrote: Hi, Thank you for your response, I think it is not because of the processing speed, in fact the delay is under 1 second, while the batch interval is 10 seconds… The data volume is low (10 lines / second) Changing to local[8] was worsening the problem (cpu increase more quickly) By the way, I have seen some results changing to this call of Kafkautils: KafkaUtils.createDirectStream CPU usage is low and stable, but memory is slowly increasing… But at least the process last longer.. Best regards, Miquel De: bit1...@163.com<mailto:bit1...@163.com> [mailto:bit1...@163.com] Enviado el: jueves, 16 de abril de 2015 10:58 Para: González Salgado, Miquel; user Asunto: Re: Streaming problems running 24x7 From your description, looks like the data processing speed is far behind the data receiving speed Could you try to increase the core number when you submit the application? such as local[8]? bit1...@163.com<mailto:bit1...@163.com> From: Miquel<mailto:miquel.gonza...@tecsidel.es> Date: 2015-04-16 16:39 To: user<mailto:user@spark.apache.org> Subject: Streaming problems running 24x7 Hello, I'm finding problems to run a spark streaming job for more than a few hours (3 or 4). It begins working OK, but it degrades until failure. Some of the symptoms: - Consumed memory and CPU keeps getting higher ang higher, and finally some error is being thrown (java.lang.Exception: Could not compute split, block input-0-1429168311800 not found) and data stops being calculated. - The delay showed in web UI keeps also increasing. - After some hours disk space is being consumed. There are a lot of directories with name like "/tmp/spark-e3505437-f509-4b5b-92d2-ae2559badb3c" The job is basically reading information from kafka topic, and calculate several topN tables for some key and value camps related with netflow data, some of the parameters are this: - batch interval: 10 seconds - window calculation: 1 minute - spark.cleaner.ttl: 5 minutes The execution is standalone on one machine (16GB RAM , 12 cores), and the options to run it is as follows: /opt/spark/bin/spark-submit --driver-java-options "-XX:+UseCompressedOops" --jars $JARS --class $APPCLASS --master local[2] $APPJAR someone has some clues about the problem? I don't know if it is a configuration problem or some error in the code that is causing memory leaks.. Thank you in advance! Miquel PD: the code is basically this:-- object NetflowTopn { var appPath = "." var zkQuorum = "" var group = "" var topics = "" var numThreads = 1 var batch_interval = 10 var n_window = 1 var n_slide = 1 var topnsize = 10 var hm = Map[String,Int]() hm += ( "unix_secs" -> 0 ) hm += ( "unix_nsecs" -> 1 ) hm += ( "sysuptime" -> 2 ) hm += ( "exaddr" -> 3 ) hm += ( "dpkts" -> 4 ) hm += ( "doctets" ->5 ) hm += ( "first" -> 6 ) hm += ( "last" -> 7 ) hm += ( "engine_type" -> 8 ) hm += ( "engine_id" -> 9 ) hm += ( "srcaddr" ->10 ) hm += ( "dstaddr" ->11 ) hm += ( "nexthop" ->12 ) hm += ( "input" -> 13 ) hm += ( "output" -> 14 ) hm += ( "srcport" ->15 ) hm += ( "dstport" ->16 ) hm += ( "prot" -> 17 ) hm += ( "tos" ->18 ) hm += ( "tcp_flags" -> 19 ) hm += ( "src_mask" -> 20 ) hm += ( "dst_mask" -> 21 ) hm += ( "src_as" -> 22 ) hm += ( "dst_as" ->
Re: Streaming problems running 24x7
You have a window operation; I have seen that behaviour before with window operations in spark streaming. My solution was to move away from window operations using probabilistic data structures; it might not be an option for you. 2015-04-20 10:29 GMT+01:00 Marius Soutier : > The processing speed displayed in the UI doesn’t seem to take everything > into account. I also had a low processing time but had to increase batch > duration from 30 seconds to 1 minute because waiting batches kept > increasing. Now it runs fine. > > On 17.04.2015, at 13:30, González Salgado, Miquel < > miquel.gonza...@tecsidel.es> wrote: > > Hi, > > Thank you for your response, > I think it is not because of the processing speed, in fact the delay is > under 1 second, while the batch interval is 10 seconds… The data volume is > low (10 lines / second) > > Changing to local[8] was worsening the problem (cpu increase more quickly) > > By the way, I have seen some results changing to this call of Kafkautils: > > KafkaUtils.createDirectStream > > CPU usage is low and stable, but memory is slowly increasing… But at least > the process last longer.. > > Best regards, > Miquel > > > *De:* bit1...@163.com [mailto:bit1...@163.com ] > *Enviado el:* jueves, 16 de abril de 2015 10:58 > *Para:* González Salgado, Miquel; user > *Asunto:* Re: Streaming problems running 24x7 > > From your description, looks like the data processing speed is far behind > the data receiving speed > > Could you try to increase the core number when you submit the application? > such as local[8]? > > ------ > bit1...@163.com > > > *From:* Miquel > *Date:* 2015-04-16 16:39 > *To:* user > *Subject:* Streaming problems running 24x7 > Hello, > I'm finding problems to run a spark streaming job for more than a few hours > (3 or 4). It begins working OK, but it degrades until failure. Some of the > symptoms: > > - Consumed memory and CPU keeps getting higher ang higher, and finally some > error is being thrown (java.lang.Exception: Could not compute split, block > input-0-1429168311800 not found) and data stops being calculated. > > - The delay showed in web UI keeps also increasing. > > - After some hours disk space is being consumed. There are a lot of > directories with name like > "/tmp/spark-e3505437-f509-4b5b-92d2-ae2559badb3c" > > The job is basically reading information from kafka topic, and calculate > several topN tables for some key and value camps related with netflow data, > some of the parameters are this: > - batch interval: 10 seconds > - window calculation: 1 minute > - spark.cleaner.ttl: 5 minutes > > The execution is standalone on one machine (16GB RAM , 12 cores), and the > options to run it is as follows: > /opt/spark/bin/spark-submit --driver-java-options "-XX:+UseCompressedOops" > --jars $JARS --class $APPCLASS --master local[2] $APPJAR > > someone has some clues about the problem? I don't know if it is a > configuration problem or some error in the code that is causing memory > leaks.. > > Thank you in advance! > Miquel > > PD: the code is basically this:-- > > object NetflowTopn { > > var appPath = "." > var zkQuorum = "" > var group = "" > var topics = "" > var numThreads = 1 > > var batch_interval = 10 > var n_window = 1 > var n_slide = 1 > var topnsize = 10 > > var hm = Map[String,Int]() > hm += ( "unix_secs" -> 0 ) > hm += ( "unix_nsecs" -> 1 ) > hm += ( "sysuptime" -> 2 ) > hm += ( "exaddr" -> 3 ) > hm += ( "dpkts" -> 4 ) > hm += ( "doctets" ->5 ) > hm += ( "first" -> 6 ) > hm += ( "last" -> 7 ) > hm += ( "engine_type" -> 8 ) > hm += ( "engine_id" -> 9 ) > hm += ( "srcaddr" ->10 ) > hm += ( "dstaddr" ->11 ) > hm += ( "nexthop" ->12 ) > hm += ( "input" -> 13 ) > hm += ( "output" -> 14 ) > hm += ( "srcport" ->15 ) > hm += ( "dstport" ->16 ) > hm += ( "prot" -> 17 ) > hm += ( "tos" ->18 ) > hm += ( "tcp_flags" -> 19 ) > hm += ( "src_mask" -> 20 ) > hm += ( "dst_mask" -> 21 ) > hm += ( "src_as" -> 22 ) > hm += ( "dst_as" -> 23 ) > > def getKey (lcamps: Array[String], camp: String): Str
Re: Streaming problems running 24x7
The processing speed displayed in the UI doesn’t seem to take everything into account. I also had a low processing time but had to increase batch duration from 30 seconds to 1 minute because waiting batches kept increasing. Now it runs fine. > On 17.04.2015, at 13:30, González Salgado, Miquel > wrote: > > Hi, > > Thank you for your response, > I think it is not because of the processing speed, in fact the delay is under > 1 second, while the batch interval is 10 seconds… The data volume is low (10 > lines / second) > > Changing to local[8] was worsening the problem (cpu increase more quickly) > > By the way, I have seen some results changing to this call of Kafkautils: > > KafkaUtils.createDirectStream > > CPU usage is low and stable, but memory is slowly increasing… But at least > the process last longer.. > > Best regards, > Miquel > > > De: bit1...@163.com <mailto:bit1...@163.com> [mailto:bit1...@163.com > <mailto:bit1...@163.com>] > Enviado el: jueves, 16 de abril de 2015 10:58 > Para: González Salgado, Miquel; user > Asunto: Re: Streaming problems running 24x7 > > From your description, looks like the data processing speed is far behind the > data receiving speed > > Could you try to increase the core number when you submit the application? > such as local[8]? > > bit1...@163.com <mailto:bit1...@163.com> > > From: Miquel <mailto:miquel.gonza...@tecsidel.es> > Date: 2015-04-16 16:39 > To: user <mailto:user@spark.apache.org> > Subject: Streaming problems running 24x7 > Hello, > I'm finding problems to run a spark streaming job for more than a few hours > (3 or 4). It begins working OK, but it degrades until failure. Some of the > symptoms: > > - Consumed memory and CPU keeps getting higher ang higher, and finally some > error is being thrown (java.lang.Exception: Could not compute split, block > input-0-1429168311800 not found) and data stops being calculated. > > - The delay showed in web UI keeps also increasing. > > - After some hours disk space is being consumed. There are a lot of > directories with name like "/tmp/spark-e3505437-f509-4b5b-92d2-ae2559badb3c" > > The job is basically reading information from kafka topic, and calculate > several topN tables for some key and value camps related with netflow data, > some of the parameters are this: > - batch interval: 10 seconds > - window calculation: 1 minute > - spark.cleaner.ttl: 5 minutes > > The execution is standalone on one machine (16GB RAM , 12 cores), and the > options to run it is as follows: > /opt/spark/bin/spark-submit --driver-java-options "-XX:+UseCompressedOops" > --jars $JARS --class $APPCLASS --master local[2] $APPJAR > > someone has some clues about the problem? I don't know if it is a > configuration problem or some error in the code that is causing memory > leaks.. > > Thank you in advance! > Miquel > > PD: the code is basically this:-- > > object NetflowTopn { > > var appPath = "." > var zkQuorum = "" > var group = "" > var topics = "" > var numThreads = 1 > > var batch_interval = 10 > var n_window = 1 > var n_slide = 1 > var topnsize = 10 > > var hm = Map[String,Int]() > hm += ( "unix_secs" -> 0 ) > hm += ( "unix_nsecs" -> 1 ) > hm += ( "sysuptime" -> 2 ) > hm += ( "exaddr" -> 3 ) > hm += ( "dpkts" -> 4 ) > hm += ( "doctets" ->5 ) > hm += ( "first" -> 6 ) > hm += ( "last" -> 7 ) > hm += ( "engine_type" -> 8 ) > hm += ( "engine_id" -> 9 ) > hm += ( "srcaddr" ->10 ) > hm += ( "dstaddr" ->11 ) > hm += ( "nexthop" ->12 ) > hm += ( "input" -> 13 ) > hm += ( "output" -> 14 ) > hm += ( "srcport" ->15 ) > hm += ( "dstport" ->16 ) > hm += ( "prot" -> 17 ) > hm += ( "tos" ->18 ) > hm += ( "tcp_flags" -> 19 ) > hm += ( "src_mask" -> 20 ) > hm += ( "dst_mask" -> 21 ) > hm += ( "src_as" -> 22 ) > hm += ( "dst_as" -> 23 ) > > def getKey (lcamps: Array[String], camp: String): String = { > if (camp == "total") return "total" > else return lcamps(hm(camp)) > } >
RE: Streaming problems running 24x7
Hi, Thank you for your response, I think it is not because of the processing speed, in fact the delay is under 1 second, while the batch interval is 10 seconds... The data volume is low (10 lines / second) Changing to local[8] was worsening the problem (cpu increase more quickly) By the way, I have seen some results changing to this call of Kafkautils: KafkaUtils.createDirectStream CPU usage is low and stable, but memory is slowly increasing... But at least the process last longer.. Best regards, Miquel De: bit1...@163.com [mailto:bit1...@163.com] Enviado el: jueves, 16 de abril de 2015 10:58 Para: González Salgado, Miquel; user Asunto: Re: Streaming problems running 24x7 >From your description, looks like the data processing speed is far behind the >data receiving speed Could you try to increase the core number when you submit the application? such as local[8]? bit1...@163.com<mailto:bit1...@163.com> From: Miquel<mailto:miquel.gonza...@tecsidel.es> Date: 2015-04-16 16:39 To: user<mailto:user@spark.apache.org> Subject: Streaming problems running 24x7 Hello, I'm finding problems to run a spark streaming job for more than a few hours (3 or 4). It begins working OK, but it degrades until failure. Some of the symptoms: - Consumed memory and CPU keeps getting higher ang higher, and finally some error is being thrown (java.lang.Exception: Could not compute split, block input-0-1429168311800 not found) and data stops being calculated. - The delay showed in web UI keeps also increasing. - After some hours disk space is being consumed. There are a lot of directories with name like "/tmp/spark-e3505437-f509-4b5b-92d2-ae2559badb3c" The job is basically reading information from kafka topic, and calculate several topN tables for some key and value camps related with netflow data, some of the parameters are this: - batch interval: 10 seconds - window calculation: 1 minute - spark.cleaner.ttl: 5 minutes The execution is standalone on one machine (16GB RAM , 12 cores), and the options to run it is as follows: /opt/spark/bin/spark-submit --driver-java-options "-XX:+UseCompressedOops" --jars $JARS --class $APPCLASS --master local[2] $APPJAR someone has some clues about the problem? I don't know if it is a configuration problem or some error in the code that is causing memory leaks.. Thank you in advance! Miquel PD: the code is basically this:-- object NetflowTopn { var appPath = "." var zkQuorum = "" var group = "" var topics = "" var numThreads = 1 var batch_interval = 10 var n_window = 1 var n_slide = 1 var topnsize = 10 var hm = Map[String,Int]() hm += ( "unix_secs" -> 0 ) hm += ( "unix_nsecs" -> 1 ) hm += ( "sysuptime" -> 2 ) hm += ( "exaddr" -> 3 ) hm += ( "dpkts" -> 4 ) hm += ( "doctets" ->5 ) hm += ( "first" -> 6 ) hm += ( "last" -> 7 ) hm += ( "engine_type" -> 8 ) hm += ( "engine_id" -> 9 ) hm += ( "srcaddr" ->10 ) hm += ( "dstaddr" ->11 ) hm += ( "nexthop" ->12 ) hm += ( "input" -> 13 ) hm += ( "output" -> 14 ) hm += ( "srcport" ->15 ) hm += ( "dstport" ->16 ) hm += ( "prot" -> 17 ) hm += ( "tos" ->18 ) hm += ( "tcp_flags" -> 19 ) hm += ( "src_mask" -> 20 ) hm += ( "dst_mask" -> 21 ) hm += ( "src_as" -> 22 ) hm += ( "dst_as" -> 23 ) def getKey (lcamps: Array[String], camp: String): String = { if (camp == "total") return "total" else return lcamps(hm(camp)) } def getVal (lcamps: Array[String], camp: String): Long = { if (camp == "flows") return 1L else return lcamps(hm(camp)).toLong } def getKeyVal (line: String, keycamps: List[String], valcamp: String ) = { val arr = line.split(",") (keycamps.map(getKey(arr, _)).mkString(",") , getVal(arr,valcamp) ) } def writeOutput (data: Array[(Long, String)], keycamps_str: String, csvheader: String, valcamp: String, prefix: String) = { val ts = System.currentTimeMillis val f1 = appPath + "/data/" + prefix + "_" + keycamps_str + "_" + valcamp + ".csv" val f1f = new File(f1); val ftmpf = new File(f1 + ts); val pw = new PrintWriter(ftmpf) pw.println(csvheader) data.foreach{ t => pw.println (t._2 + "," + t._1) } pw.close ftmpf.renameTo(f1f); } def main(
RE: Streaming problems running 24x7
Hi Akhil, Thank you for your response, I think it is not because of the processing time, in fact the delay is under 1 second, while the batch interval is 10 seconds… The data volume is low (10 lines / second) By the way, I have seen some results changing to this call of Kafkautils: KafkaUtils.createDirectStream CPU usage is low and stable, but memory is slowly increasing… But at least the process last longer.. Best regards, Miquel De: Akhil Das [mailto:ak...@sigmoidanalytics.com] Enviado el: jueves, 16 de abril de 2015 12:07 Para: González Salgado, Miquel CC: user@spark.apache.org Asunto: Re: Streaming problems running 24x7 I used to hit this issue when my processing time exceeds the batch duration. Here's a few workarounds: - Use storage level MEMORY_AND_DISK - Enable WAL and check pointing Above two will slow down things a little bit. If you want low latency, what you can try is: - Use storage level as MEMORY_ONLY_2 ( Atleast replicates it) - Tachyon based off heap for storage (havent tried this, but will let you know) And from spark 1.3.1 version, they have purged the old WAL and it has better performance. You could try that also. On 16 Apr 2015 14:10, "Miquel" mailto:miquel.gonza...@tecsidel.es>> wrote: Hello, I'm finding problems to run a spark streaming job for more than a few hours (3 or 4). It begins working OK, but it degrades until failure. Some of the symptoms: - Consumed memory and CPU keeps getting higher ang higher, and finally some error is being thrown (java.lang.Exception: Could not compute split, block input-0-1429168311800 not found) and data stops being calculated. - The delay showed in web UI keeps also increasing. - After some hours disk space is being consumed. There are a lot of directories with name like "/tmp/spark-e3505437-f509-4b5b-92d2-ae2559badb3c" The job is basically reading information from kafka topic, and calculate several topN tables for some key and value camps related with netflow data, some of the parameters are this: - batch interval: 10 seconds - window calculation: 1 minute - spark.cleaner.ttl: 5 minutes The execution is standalone on one machine (16GB RAM , 12 cores), and the options to run it is as follows: /opt/spark/bin/spark-submit --driver-java-options "-XX:+UseCompressedOops" --jars $JARS --class $APPCLASS --master local[2] $APPJAR someone has some clues about the problem? I don't know if it is a configuration problem or some error in the code that is causing memory leaks.. Thank you in advance! Miquel PD: the code is basically this:-- object NetflowTopn { var appPath = "." var zkQuorum = "" var group = "" var topics = "" var numThreads = 1 var batch_interval = 10 var n_window = 1 var n_slide = 1 var topnsize = 10 var hm = Map[String,Int]() hm += ( "unix_secs" -> 0 ) hm += ( "unix_nsecs" -> 1 ) hm += ( "sysuptime" -> 2 ) hm += ( "exaddr" -> 3 ) hm += ( "dpkts" -> 4 ) hm += ( "doctets" ->5 ) hm += ( "first" -> 6 ) hm += ( "last" -> 7 ) hm += ( "engine_type" -> 8 ) hm += ( "engine_id" -> 9 ) hm += ( "srcaddr" ->10 ) hm += ( "dstaddr" ->11 ) hm += ( "nexthop" ->12 ) hm += ( "input" -> 13 ) hm += ( "output" -> 14 ) hm += ( "srcport" ->15 ) hm += ( "dstport" ->16 ) hm += ( "prot" -> 17 ) hm += ( "tos" ->18 ) hm += ( "tcp_flags" -> 19 ) hm += ( "src_mask" -> 20 ) hm += ( "dst_mask" -> 21 ) hm += ( "src_as" -> 22 ) hm += ( "dst_as" -> 23 ) def getKey (lcamps: Array[String], camp: String): String = { if (camp == "total") return "total" else return lcamps(hm(camp)) } def getVal (lcamps: Array[String], camp: String): Long = { if (camp == "flows") return 1L else return lcamps(hm(camp)).toLong } def getKeyVal (line: String, keycamps: List[String], valcamp: String ) = { val arr = line.split(",") (keycamps.map(getKey(arr, _)).mkString(",") , getVal(arr,valcamp) ) } def writeOutput (data: Array[(Long, String)], keycamps_str: String, csvheader: String, valcamp: String, prefix: String) = { val ts = System.currentTimeMillis val f1 = appPath + "/data/" + prefix + "_" + keycamps_str + "_" + valcamp + ".csv" val f1f = new File(f1); val ftmpf = new File(f1 + ts); val pw = new PrintWriter(ftmpf) pw.println(csvheader) data.foreach{ t =&g
Re: Streaming problems running 24x7
n(args: Array[String]) { > > if (args.length < 1) { > System.err.println("Usage: NetflowTopn ") > System.exit(1) > } > > appPath = args(0) > > try { >val prop = new Properties() >prop.load(new FileInputStream(appPath + "/conf/app.properties")) > >zkQuorum =prop.getProperty("KAFKA_HOST") >group = prop.getProperty("KAFKA_GROUP") >topics = prop.getProperty("KAFKA_TOPIC") >numThreads = prop.getProperty("THREADS").toInt > > } catch { case e: Exception => >e.printStackTrace() >sys.exit(1) > } > > val sparkConf = new SparkConf().setAppName("netflow-topn") >.set("spark.default.parallelism", "2") >.set("spark.rdd.compress", "true") >.set("spark.streaming.unpersist", > "true") >.set("spark.cleaner.ttl", "300") > > val ssc = new StreamingContext(sparkConf, Seconds(batch_interval)) > > val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap > > val kafpar = Map[String, String]( > "zookeeper.connect" -> zkQuorum, > "group.id" -> group, > "zookeeper.connection.timeout.ms" -> "5000", > "auto.commit.interval.ms" -> "6", > "auto.offset.reset" -> "largest" > ) > > val lines = KafkaUtils.createStream[String, String, StringDecoder, > StringDecoder] (ssc, kafpar, topicMap, > StorageLevel.MEMORY_ONLY_SER).map(_._2).cache() > >val ll_keycamps = List ( List("srcaddr", "dstaddr") > ,List("dstaddr") > ,List("srcaddr") > ,List("srcport") > ,List("dstport") > ,List("total") > ) > >val l_valcamps = List ("doctets" > ,"dpkts" > ,"flows" > ) > >for (keycamps <- ll_keycamps) { > > val keycamps_str = keycamps.mkString("-") > val csvheader = keycamps.mkString(",") + ",amount" > > for (valcamp <- l_valcamps) { > > val lines2 = lines.map( getKeyVal (_, keycamps, valcamp ) > ).cache() > > lines2.reduceByKeyAndWindow((a:Long,b:Long)=>a+b, > Seconds(60), > Seconds(10)) > .map(_.swap) > .transform(_.sortByKey(false)) > .foreachRDD(rdd => { > val data = rdd.take(20) > writeOutput (data, keycamps_str, csvheader, > valcamp, "DATAWINDOW") > }) > > > lines2.reduceByKey((a:Long,b:Long)=>a+b) > .map(_.swap) > .transform(_.sortByKey(false)) > .foreachRDD(rdd => { > val data = rdd.take(20) > writeOutput (data, keycamps_str, csvheader, > valcamp, "DATA") > }) > > } > } > > ssc.start() > ssc.awaitTermination() > > } > } > > > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-problems-running-24x7-tp22518.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Streaming problems running 24x7
, "300") val ssc = new StreamingContext(sparkConf, Seconds(batch_interval)) val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap val kafpar = Map[String, String]( "zookeeper.connect" -> zkQuorum, "group.id" -> group, "zookeeper.connection.timeout.ms" -> "5000", "auto.commit.interval.ms" -> "6", "auto.offset.reset" -> "largest" ) val lines = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder] (ssc, kafpar, topicMap, StorageLevel.MEMORY_ONLY_SER).map(_._2).cache() val ll_keycamps = List ( List("srcaddr", "dstaddr") ,List("dstaddr") ,List("srcaddr") ,List("srcport") ,List("dstport") ,List("total") ) val l_valcamps = List ("doctets" ,"dpkts" ,"flows" ) for (keycamps <- ll_keycamps) { val keycamps_str = keycamps.mkString("-") val csvheader = keycamps.mkString(",") + ",amount" for (valcamp <- l_valcamps) { val lines2 = lines.map( getKeyVal (_, keycamps, valcamp ) ).cache() lines2.reduceByKeyAndWindow((a:Long,b:Long)=>a+b, Seconds(60), Seconds(10)) .map(_.swap) .transform(_.sortByKey(false)) .foreachRDD(rdd => { val data = rdd.take(20) writeOutput (data, keycamps_str, csvheader, valcamp, "DATAWINDOW") }) lines2.reduceByKey((a:Long,b:Long)=>a+b) .map(_.swap) .transform(_.sortByKey(false)) .foreachRDD(rdd => { val data = rdd.take(20) writeOutput (data, keycamps_str, csvheader, valcamp, "DATA") }) } } ssc.start() ssc.awaitTermination() } } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-problems-running-24x7-tp22518.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org