Can you replace your counting part with this?

logs.filter(_.s_id > 0).foreachRDD(rdd => logger.info(rdd.count()))



Thanks
Best Regards

On Thu, May 28, 2015 at 1:02 PM, Ji ZHANG <zhangj...@gmail.com> wrote:

> Hi,
>
> I wrote a simple test job, it only does very basic operations. for example:
>
>     val lines = KafkaUtils.createStream(ssc, zkQuorum, group, Map(topic ->
> 1)).map(_._2)
>     val logs = lines.flatMap { line =>
>       try {
>         Some(parse(line).extract[Impression])
>       } catch {
>         case _: Exception => None
>       }
>     }
>
>     logs.filter(_.s_id > 0).count.foreachRDD { rdd =>
>       rdd.foreachPartition { iter =>
>         iter.foreach(count => logger.info(count.toString))
>       }
>     }
>
> It receives messages from Kafka, parse the json, filter and count the
> records, and then print it to logs.
>
> Thanks.
>
>
> On Thu, May 28, 2015 at 3:07 PM, Akhil Das <ak...@sigmoidanalytics.com>
> wrote:
>
>> Hi Zhang,
>>
>> Could you paste your code in a gist? Not sure what you are doing inside
>> the code to fill up memory.
>>
>> Thanks
>> Best Regards
>>
>> On Thu, May 28, 2015 at 10:08 AM, Ji ZHANG <zhangj...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Yes, I'm using createStream, but the storageLevel param is by default
>>> MEMORY_AND_DISK_SER_2. Besides, the driver's memory is also growing. I
>>> don't think Kafka messages will be cached in driver.
>>>
>>>
>>> On Thu, May 28, 2015 at 12:24 AM, Akhil Das <ak...@sigmoidanalytics.com>
>>> wrote:
>>>
>>>> Are you using the createStream or createDirectStream api? If its the
>>>> former, you can try setting the StorageLevel to MEMORY_AND_DISK (it might
>>>> slow things down though). Another way would be to try the later one.
>>>>
>>>> Thanks
>>>> Best Regards
>>>>
>>>> On Wed, May 27, 2015 at 1:00 PM, Ji ZHANG <zhangj...@gmail.com> wrote:
>>>>
>>>>> Hi Akhil,
>>>>>
>>>>> Thanks for your reply. Accoding to the Streaming tab of Web UI, the
>>>>> Processing Time is around 400ms, and there's no Scheduling Delay, so I
>>>>> suppose it's not the Kafka messages that eat up the off-heap memory. Or
>>>>> maybe it is, but how to tell?
>>>>>
>>>>> I googled about how to check the off-heap memory usage, there's a tool
>>>>> called pmap, but I don't know how to interprete the results.
>>>>>
>>>>> On Wed, May 27, 2015 at 3:08 PM, Akhil Das <ak...@sigmoidanalytics.com
>>>>> > wrote:
>>>>>
>>>>>> After submitting the job, if you do a ps aux | grep spark-submit then
>>>>>> you can see all JVM params. Are you using the highlevel consumer 
>>>>>> (receiver
>>>>>> based) for receiving data from Kafka? In that case if your throughput is
>>>>>> high and the processing delay exceeds batch interval then you will hit 
>>>>>> this
>>>>>> memory issues as the data will keep on receiving and is dumped to memory.
>>>>>> You can set StorageLevel to MEMORY_AND_DISK (but it slows things down).
>>>>>> Another alternate will be to use the lowlevel kafka consumer
>>>>>> <https://github.com/dibbhatt/kafka-spark-consumer> or to use the
>>>>>> non-receiver based directStream
>>>>>> <https://spark.apache.org/docs/1.3.1/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers>
>>>>>> that comes up with spark.
>>>>>>
>>>>>> Thanks
>>>>>> Best Regards
>>>>>>
>>>>>> On Wed, May 27, 2015 at 11:51 AM, Ji ZHANG <zhangj...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I'm using Spark Streaming 1.3 on CDH5.1 with yarn-cluster mode. I
>>>>>>> find out that YARN is killing the driver and executor process because of
>>>>>>> excessive use of memory. Here's something I tried:
>>>>>>>
>>>>>>> 1. Xmx is set to 512M and the GC looks fine (one ygc per 10s), so
>>>>>>> the extra memory is not used by heap.
>>>>>>> 2. I set the two memoryOverhead params to 1024 (default is 384), but
>>>>>>> the memory just keeps growing and then hits the limit.
>>>>>>> 3. This problem is not shown in low-throughput jobs, neither in
>>>>>>> standalone mode.
>>>>>>> 4. The test job just receives messages from Kafka, with batch
>>>>>>> interval of 1, do some filtering and aggregation, and then print to
>>>>>>> executor logs. So it's not some 3rd party library that causes the 
>>>>>>> 'leak'.
>>>>>>>
>>>>>>> Spark 1.3 is built by myself, with correct hadoop versions.
>>>>>>>
>>>>>>> Any ideas will be appreciated.
>>>>>>>
>>>>>>> Thanks.
>>>>>>>
>>>>>>> --
>>>>>>> Jerry
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Jerry
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> Jerry
>>>
>>
>>
>
>
> --
> Jerry
>

Reply via email to