Can you replace your counting part with this? logs.filter(_.s_id > 0).foreachRDD(rdd => logger.info(rdd.count()))
Thanks Best Regards On Thu, May 28, 2015 at 1:02 PM, Ji ZHANG <zhangj...@gmail.com> wrote: > Hi, > > I wrote a simple test job, it only does very basic operations. for example: > > val lines = KafkaUtils.createStream(ssc, zkQuorum, group, Map(topic -> > 1)).map(_._2) > val logs = lines.flatMap { line => > try { > Some(parse(line).extract[Impression]) > } catch { > case _: Exception => None > } > } > > logs.filter(_.s_id > 0).count.foreachRDD { rdd => > rdd.foreachPartition { iter => > iter.foreach(count => logger.info(count.toString)) > } > } > > It receives messages from Kafka, parse the json, filter and count the > records, and then print it to logs. > > Thanks. > > > On Thu, May 28, 2015 at 3:07 PM, Akhil Das <ak...@sigmoidanalytics.com> > wrote: > >> Hi Zhang, >> >> Could you paste your code in a gist? Not sure what you are doing inside >> the code to fill up memory. >> >> Thanks >> Best Regards >> >> On Thu, May 28, 2015 at 10:08 AM, Ji ZHANG <zhangj...@gmail.com> wrote: >> >>> Hi, >>> >>> Yes, I'm using createStream, but the storageLevel param is by default >>> MEMORY_AND_DISK_SER_2. Besides, the driver's memory is also growing. I >>> don't think Kafka messages will be cached in driver. >>> >>> >>> On Thu, May 28, 2015 at 12:24 AM, Akhil Das <ak...@sigmoidanalytics.com> >>> wrote: >>> >>>> Are you using the createStream or createDirectStream api? If its the >>>> former, you can try setting the StorageLevel to MEMORY_AND_DISK (it might >>>> slow things down though). Another way would be to try the later one. >>>> >>>> Thanks >>>> Best Regards >>>> >>>> On Wed, May 27, 2015 at 1:00 PM, Ji ZHANG <zhangj...@gmail.com> wrote: >>>> >>>>> Hi Akhil, >>>>> >>>>> Thanks for your reply. Accoding to the Streaming tab of Web UI, the >>>>> Processing Time is around 400ms, and there's no Scheduling Delay, so I >>>>> suppose it's not the Kafka messages that eat up the off-heap memory. Or >>>>> maybe it is, but how to tell? >>>>> >>>>> I googled about how to check the off-heap memory usage, there's a tool >>>>> called pmap, but I don't know how to interprete the results. >>>>> >>>>> On Wed, May 27, 2015 at 3:08 PM, Akhil Das <ak...@sigmoidanalytics.com >>>>> > wrote: >>>>> >>>>>> After submitting the job, if you do a ps aux | grep spark-submit then >>>>>> you can see all JVM params. Are you using the highlevel consumer >>>>>> (receiver >>>>>> based) for receiving data from Kafka? In that case if your throughput is >>>>>> high and the processing delay exceeds batch interval then you will hit >>>>>> this >>>>>> memory issues as the data will keep on receiving and is dumped to memory. >>>>>> You can set StorageLevel to MEMORY_AND_DISK (but it slows things down). >>>>>> Another alternate will be to use the lowlevel kafka consumer >>>>>> <https://github.com/dibbhatt/kafka-spark-consumer> or to use the >>>>>> non-receiver based directStream >>>>>> <https://spark.apache.org/docs/1.3.1/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers> >>>>>> that comes up with spark. >>>>>> >>>>>> Thanks >>>>>> Best Regards >>>>>> >>>>>> On Wed, May 27, 2015 at 11:51 AM, Ji ZHANG <zhangj...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> I'm using Spark Streaming 1.3 on CDH5.1 with yarn-cluster mode. I >>>>>>> find out that YARN is killing the driver and executor process because of >>>>>>> excessive use of memory. Here's something I tried: >>>>>>> >>>>>>> 1. Xmx is set to 512M and the GC looks fine (one ygc per 10s), so >>>>>>> the extra memory is not used by heap. >>>>>>> 2. I set the two memoryOverhead params to 1024 (default is 384), but >>>>>>> the memory just keeps growing and then hits the limit. >>>>>>> 3. This problem is not shown in low-throughput jobs, neither in >>>>>>> standalone mode. >>>>>>> 4. The test job just receives messages from Kafka, with batch >>>>>>> interval of 1, do some filtering and aggregation, and then print to >>>>>>> executor logs. So it's not some 3rd party library that causes the >>>>>>> 'leak'. >>>>>>> >>>>>>> Spark 1.3 is built by myself, with correct hadoop versions. >>>>>>> >>>>>>> Any ideas will be appreciated. >>>>>>> >>>>>>> Thanks. >>>>>>> >>>>>>> -- >>>>>>> Jerry >>>>>>> >>>>>> >>>>>> >>>>> >>>>> >>>>> -- >>>>> Jerry >>>>> >>>> >>>> >>> >>> >>> -- >>> Jerry >>> >> >> > > > -- > Jerry >