Hi, Unfortunately, they're still growing, both driver and executors.
I run the same job with local mode, everything is fine. On Thu, May 28, 2015 at 5:26 PM, Akhil Das <ak...@sigmoidanalytics.com> wrote: > Can you replace your counting part with this? > > logs.filter(_.s_id > 0).foreachRDD(rdd => logger.info(rdd.count())) > > > > Thanks > Best Regards > > On Thu, May 28, 2015 at 1:02 PM, Ji ZHANG <zhangj...@gmail.com> wrote: > >> Hi, >> >> I wrote a simple test job, it only does very basic operations. for >> example: >> >> val lines = KafkaUtils.createStream(ssc, zkQuorum, group, Map(topic >> -> 1)).map(_._2) >> val logs = lines.flatMap { line => >> try { >> Some(parse(line).extract[Impression]) >> } catch { >> case _: Exception => None >> } >> } >> >> logs.filter(_.s_id > 0).count.foreachRDD { rdd => >> rdd.foreachPartition { iter => >> iter.foreach(count => logger.info(count.toString)) >> } >> } >> >> It receives messages from Kafka, parse the json, filter and count the >> records, and then print it to logs. >> >> Thanks. >> >> >> On Thu, May 28, 2015 at 3:07 PM, Akhil Das <ak...@sigmoidanalytics.com> >> wrote: >> >>> Hi Zhang, >>> >>> Could you paste your code in a gist? Not sure what you are doing inside >>> the code to fill up memory. >>> >>> Thanks >>> Best Regards >>> >>> On Thu, May 28, 2015 at 10:08 AM, Ji ZHANG <zhangj...@gmail.com> wrote: >>> >>>> Hi, >>>> >>>> Yes, I'm using createStream, but the storageLevel param is by default >>>> MEMORY_AND_DISK_SER_2. Besides, the driver's memory is also growing. I >>>> don't think Kafka messages will be cached in driver. >>>> >>>> >>>> On Thu, May 28, 2015 at 12:24 AM, Akhil Das <ak...@sigmoidanalytics.com >>>> > wrote: >>>> >>>>> Are you using the createStream or createDirectStream api? If its the >>>>> former, you can try setting the StorageLevel to MEMORY_AND_DISK (it might >>>>> slow things down though). Another way would be to try the later one. >>>>> >>>>> Thanks >>>>> Best Regards >>>>> >>>>> On Wed, May 27, 2015 at 1:00 PM, Ji ZHANG <zhangj...@gmail.com> wrote: >>>>> >>>>>> Hi Akhil, >>>>>> >>>>>> Thanks for your reply. Accoding to the Streaming tab of Web UI, the >>>>>> Processing Time is around 400ms, and there's no Scheduling Delay, so I >>>>>> suppose it's not the Kafka messages that eat up the off-heap memory. Or >>>>>> maybe it is, but how to tell? >>>>>> >>>>>> I googled about how to check the off-heap memory usage, there's a >>>>>> tool called pmap, but I don't know how to interprete the results. >>>>>> >>>>>> On Wed, May 27, 2015 at 3:08 PM, Akhil Das < >>>>>> ak...@sigmoidanalytics.com> wrote: >>>>>> >>>>>>> After submitting the job, if you do a ps aux | grep spark-submit >>>>>>> then you can see all JVM params. Are you using the highlevel consumer >>>>>>> (receiver based) for receiving data from Kafka? In that case if your >>>>>>> throughput is high and the processing delay exceeds batch interval then >>>>>>> you >>>>>>> will hit this memory issues as the data will keep on receiving and is >>>>>>> dumped to memory. You can set StorageLevel to MEMORY_AND_DISK (but it >>>>>>> slows >>>>>>> things down). Another alternate will be to use the lowlevel kafka >>>>>>> consumer <https://github.com/dibbhatt/kafka-spark-consumer> or to >>>>>>> use the non-receiver based directStream >>>>>>> <https://spark.apache.org/docs/1.3.1/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers> >>>>>>> that comes up with spark. >>>>>>> >>>>>>> Thanks >>>>>>> Best Regards >>>>>>> >>>>>>> On Wed, May 27, 2015 at 11:51 AM, Ji ZHANG <zhangj...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> I'm using Spark Streaming 1.3 on CDH5.1 with yarn-cluster mode. I >>>>>>>> find out that YARN is killing the driver and executor process because >>>>>>>> of >>>>>>>> excessive use of memory. Here's something I tried: >>>>>>>> >>>>>>>> 1. Xmx is set to 512M and the GC looks fine (one ygc per 10s), so >>>>>>>> the extra memory is not used by heap. >>>>>>>> 2. I set the two memoryOverhead params to 1024 (default is 384), >>>>>>>> but the memory just keeps growing and then hits the limit. >>>>>>>> 3. This problem is not shown in low-throughput jobs, neither in >>>>>>>> standalone mode. >>>>>>>> 4. The test job just receives messages from Kafka, with batch >>>>>>>> interval of 1, do some filtering and aggregation, and then print to >>>>>>>> executor logs. So it's not some 3rd party library that causes the >>>>>>>> 'leak'. >>>>>>>> >>>>>>>> Spark 1.3 is built by myself, with correct hadoop versions. >>>>>>>> >>>>>>>> Any ideas will be appreciated. >>>>>>>> >>>>>>>> Thanks. >>>>>>>> >>>>>>>> -- >>>>>>>> Jerry >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Jerry >>>>>> >>>>> >>>>> >>>> >>>> >>>> -- >>>> Jerry >>>> >>> >>> >> >> >> -- >> Jerry >> > > -- Jerry