Hi,

Unfortunately, they're still growing, both driver and executors.

I run the same job with local mode, everything is fine.

On Thu, May 28, 2015 at 5:26 PM, Akhil Das <ak...@sigmoidanalytics.com>
wrote:

> Can you replace your counting part with this?
>
> logs.filter(_.s_id > 0).foreachRDD(rdd => logger.info(rdd.count()))
>
>
>
> Thanks
> Best Regards
>
> On Thu, May 28, 2015 at 1:02 PM, Ji ZHANG <zhangj...@gmail.com> wrote:
>
>> Hi,
>>
>> I wrote a simple test job, it only does very basic operations. for
>> example:
>>
>>     val lines = KafkaUtils.createStream(ssc, zkQuorum, group, Map(topic
>> -> 1)).map(_._2)
>>     val logs = lines.flatMap { line =>
>>       try {
>>         Some(parse(line).extract[Impression])
>>       } catch {
>>         case _: Exception => None
>>       }
>>     }
>>
>>     logs.filter(_.s_id > 0).count.foreachRDD { rdd =>
>>       rdd.foreachPartition { iter =>
>>         iter.foreach(count => logger.info(count.toString))
>>       }
>>     }
>>
>> It receives messages from Kafka, parse the json, filter and count the
>> records, and then print it to logs.
>>
>> Thanks.
>>
>>
>> On Thu, May 28, 2015 at 3:07 PM, Akhil Das <ak...@sigmoidanalytics.com>
>> wrote:
>>
>>> Hi Zhang,
>>>
>>> Could you paste your code in a gist? Not sure what you are doing inside
>>> the code to fill up memory.
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Thu, May 28, 2015 at 10:08 AM, Ji ZHANG <zhangj...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> Yes, I'm using createStream, but the storageLevel param is by default
>>>> MEMORY_AND_DISK_SER_2. Besides, the driver's memory is also growing. I
>>>> don't think Kafka messages will be cached in driver.
>>>>
>>>>
>>>> On Thu, May 28, 2015 at 12:24 AM, Akhil Das <ak...@sigmoidanalytics.com
>>>> > wrote:
>>>>
>>>>> Are you using the createStream or createDirectStream api? If its the
>>>>> former, you can try setting the StorageLevel to MEMORY_AND_DISK (it might
>>>>> slow things down though). Another way would be to try the later one.
>>>>>
>>>>> Thanks
>>>>> Best Regards
>>>>>
>>>>> On Wed, May 27, 2015 at 1:00 PM, Ji ZHANG <zhangj...@gmail.com> wrote:
>>>>>
>>>>>> Hi Akhil,
>>>>>>
>>>>>> Thanks for your reply. Accoding to the Streaming tab of Web UI, the
>>>>>> Processing Time is around 400ms, and there's no Scheduling Delay, so I
>>>>>> suppose it's not the Kafka messages that eat up the off-heap memory. Or
>>>>>> maybe it is, but how to tell?
>>>>>>
>>>>>> I googled about how to check the off-heap memory usage, there's a
>>>>>> tool called pmap, but I don't know how to interprete the results.
>>>>>>
>>>>>> On Wed, May 27, 2015 at 3:08 PM, Akhil Das <
>>>>>> ak...@sigmoidanalytics.com> wrote:
>>>>>>
>>>>>>> After submitting the job, if you do a ps aux | grep spark-submit
>>>>>>> then you can see all JVM params. Are you using the highlevel consumer
>>>>>>> (receiver based) for receiving data from Kafka? In that case if your
>>>>>>> throughput is high and the processing delay exceeds batch interval then 
>>>>>>> you
>>>>>>> will hit this memory issues as the data will keep on receiving and is
>>>>>>> dumped to memory. You can set StorageLevel to MEMORY_AND_DISK (but it 
>>>>>>> slows
>>>>>>> things down). Another alternate will be to use the lowlevel kafka
>>>>>>> consumer <https://github.com/dibbhatt/kafka-spark-consumer> or to
>>>>>>> use the non-receiver based directStream
>>>>>>> <https://spark.apache.org/docs/1.3.1/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers>
>>>>>>> that comes up with spark.
>>>>>>>
>>>>>>> Thanks
>>>>>>> Best Regards
>>>>>>>
>>>>>>> On Wed, May 27, 2015 at 11:51 AM, Ji ZHANG <zhangj...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> I'm using Spark Streaming 1.3 on CDH5.1 with yarn-cluster mode. I
>>>>>>>> find out that YARN is killing the driver and executor process because 
>>>>>>>> of
>>>>>>>> excessive use of memory. Here's something I tried:
>>>>>>>>
>>>>>>>> 1. Xmx is set to 512M and the GC looks fine (one ygc per 10s), so
>>>>>>>> the extra memory is not used by heap.
>>>>>>>> 2. I set the two memoryOverhead params to 1024 (default is 384),
>>>>>>>> but the memory just keeps growing and then hits the limit.
>>>>>>>> 3. This problem is not shown in low-throughput jobs, neither in
>>>>>>>> standalone mode.
>>>>>>>> 4. The test job just receives messages from Kafka, with batch
>>>>>>>> interval of 1, do some filtering and aggregation, and then print to
>>>>>>>> executor logs. So it's not some 3rd party library that causes the 
>>>>>>>> 'leak'.
>>>>>>>>
>>>>>>>> Spark 1.3 is built by myself, with correct hadoop versions.
>>>>>>>>
>>>>>>>> Any ideas will be appreciated.
>>>>>>>>
>>>>>>>> Thanks.
>>>>>>>>
>>>>>>>> --
>>>>>>>> Jerry
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Jerry
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Jerry
>>>>
>>>
>>>
>>
>>
>> --
>> Jerry
>>
>
>


-- 
Jerry

Reply via email to