Re: Spark Thrift Server Concurrency

2016-06-27 Thread Prabhu Joseph
Spark Thrift Server is started with

./sbin/start-thriftserver.sh --master yarn-client --hiveconf
hive.server2.thrift.port=10001 --num-executors 4 --executor-cores 2
--executor-memory 4G --conf spark.scheduler.mode=FAIR

20 parallel below queries are executed

select distinct val2 from philips1 where key>=1000 and key<=1500

And there is no issue at the backend Spark Executors, as spark jobs UI
shows all 20 queries are launched and completed with same duration. And all
20 queries are received by Spark Thrift Server at same time. But the Spark
Driver present inside Spark Thrift Sever  looks like overloaded and hence
the queries are not parsed and
submitted to executors at same time and hence seeing the delay in query
execution time from client.





On Thu, Jun 23, 2016 at 11:12 PM, Michael Segel <msegel_had...@hotmail.com>
wrote:

> Hi,
> There are  a lot of moving parts and a lot of unknowns from your
> description.
> Besides the version stuff.
>
> How many executors, how many cores? How much memory?
> Are you persisting (memory and disk) or just caching (memory)
>
> During the execution… same tables… are  you seeing a lot of shuffling of
> data for some queries and not others?
>
> It sounds like an interesting problem…
>
> On Jun 23, 2016, at 5:21 AM, Prabhu Joseph <prabhujose.ga...@gmail.com>
> wrote:
>
> Hi All,
>
>On submitting 20 parallel same SQL query to Spark Thrift Server, the
> query execution time for some queries are less than a second and some are
> more than 2seconds. The Spark Thrift Server logs shows all 20 queries are
> submitted at same time 16/06/23 12:12:01 but the result schema are at
> different times.
>
> 16/06/23 12:12:01 INFO SparkExecuteStatementOperation: Running query
> 'select distinct val2 from philips1 where key>=1000 and key<=1500
>
> 16/06/23 12:12:*02* INFO SparkExecuteStatementOperation: Result Schema:
> ArrayBuffer(val2#2110)
> 16/06/23 12:12:*03* INFO SparkExecuteStatementOperation: Result Schema:
> ArrayBuffer(val2#2182)
> 16/06/23 12:12:*04* INFO SparkExecuteStatementOperation: Result Schema:
> ArrayBuffer(val2#2344)
> 16/06/23 12:12:*05* INFO SparkExecuteStatementOperation: Result Schema:
> ArrayBuffer(val2#2362)
>
> There are sufficient executors running on YARN. The concurrency is
> affected by Single Driver. How to improve the concurrency and what are the
> best practices.
>
> Thanks,
> Prabhu Joseph
>
>
>


Spark Thrift Server Concurrency

2016-06-23 Thread Prabhu Joseph
Hi All,

   On submitting 20 parallel same SQL query to Spark Thrift Server, the
query execution time for some queries are less than a second and some are
more than 2seconds. The Spark Thrift Server logs shows all 20 queries are
submitted at same time 16/06/23 12:12:01 but the result schema are at
different times.

16/06/23 12:12:01 INFO SparkExecuteStatementOperation: Running query
'select distinct val2 from philips1 where key>=1000 and key<=1500

16/06/23 12:12:*02* INFO SparkExecuteStatementOperation: Result Schema:
ArrayBuffer(val2#2110)
16/06/23 12:12:*03* INFO SparkExecuteStatementOperation: Result Schema:
ArrayBuffer(val2#2182)
16/06/23 12:12:*04* INFO SparkExecuteStatementOperation: Result Schema:
ArrayBuffer(val2#2344)
16/06/23 12:12:*05* INFO SparkExecuteStatementOperation: Result Schema:
ArrayBuffer(val2#2362)

There are sufficient executors running on YARN. The concurrency is affected
by Single Driver. How to improve the concurrency and what are the best
practices.

Thanks,
Prabhu Joseph


Improving Spark Scheduler Delay

2016-03-19 Thread Prabhu Joseph
Hi All,

On running Concurrent Spark Jobs (huge number of tasks) with same Spark
Context, there is high scheduler delay. We have FAIR schedulingPolicy set
and also we tried with different pool for each jobs but still no
improvement. What are the tuning ways to improve Scheduler Delay.

Thanks,
Prabhu Joseph


Re: Spark UI Completed Jobs

2016-03-15 Thread Prabhu Joseph
Thanks Mark and Jeff

On Wed, Mar 16, 2016 at 7:11 AM, Mark Hamstra <m...@clearstorydata.com>
wrote:

> Looks to me like the one remaining Stage would execute 19788 Task if all
> of those Tasks succeeded on the first try; but because of retries, 19841
> Tasks were actually executed.  Meanwhile, there were 41405 Tasks in the the
> 163 Stages that were skipped.
>
> I think -- but the Spark UI's accounting may not be 100% accurate and bug
> free.
>
> On Tue, Mar 15, 2016 at 6:34 PM, Prabhu Joseph <prabhujose.ga...@gmail.com
> > wrote:
>
>> Okay, so out of 164 stages, is 163 are skipped. And how 41405 tasks are
>> skipped if the total is only 19788.
>>
>> On Wed, Mar 16, 2016 at 6:31 AM, Mark Hamstra <m...@clearstorydata.com>
>> wrote:
>>
>>> It's not just if the RDD is explicitly cached, but also if the map
>>> outputs for stages have been materialized into shuffle files and are still
>>> accessible through the map output tracker.  Because of that, explicitly
>>> caching RDD actions often gains you little or nothing, since even without a
>>> call to cache() or persist() the prior computation will largely be reused
>>> and stages will show up as skipped -- i.e. no need to recompute that stage.
>>>
>>> On Tue, Mar 15, 2016 at 5:50 PM, Jeff Zhang <zjf...@gmail.com> wrote:
>>>
>>>> If RDD is cached, this RDD is only computed once and the stages for
>>>> computing this RDD in the following jobs are skipped.
>>>>
>>>>
>>>> On Wed, Mar 16, 2016 at 8:14 AM, Prabhu Joseph <
>>>> prabhujose.ga...@gmail.com> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>>
>>>>> Spark UI Completed Jobs section shows below information, what is the
>>>>> skipped value shown for Stages and Tasks below.
>>>>>
>>>>> Job_IDDescriptionSubmitted
>>>>> Duration   Stages (Succeeded/Total)Tasks (for all stages):
>>>>> Succeeded/Total
>>>>>
>>>>> 11 count  2016/03/14 15:35:32  1.4
>>>>> min 164/164 * (163 skipped)   *19841/19788
>>>>> *(41405 skipped)*
>>>>> Thanks,
>>>>> Prabhu Joseph
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Best Regards
>>>>
>>>> Jeff Zhang
>>>>
>>>
>>>
>>
>


Re: Spark UI Completed Jobs

2016-03-15 Thread Prabhu Joseph
Okay, so out of 164 stages, is 163 are skipped. And how 41405 tasks are
skipped if the total is only 19788.

On Wed, Mar 16, 2016 at 6:31 AM, Mark Hamstra <m...@clearstorydata.com>
wrote:

> It's not just if the RDD is explicitly cached, but also if the map outputs
> for stages have been materialized into shuffle files and are still
> accessible through the map output tracker.  Because of that, explicitly
> caching RDD actions often gains you little or nothing, since even without a
> call to cache() or persist() the prior computation will largely be reused
> and stages will show up as skipped -- i.e. no need to recompute that stage.
>
> On Tue, Mar 15, 2016 at 5:50 PM, Jeff Zhang <zjf...@gmail.com> wrote:
>
>> If RDD is cached, this RDD is only computed once and the stages for
>> computing this RDD in the following jobs are skipped.
>>
>>
>> On Wed, Mar 16, 2016 at 8:14 AM, Prabhu Joseph <
>> prabhujose.ga...@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>>
>>> Spark UI Completed Jobs section shows below information, what is the
>>> skipped value shown for Stages and Tasks below.
>>>
>>> Job_IDDescriptionSubmittedDuration
>>> Stages (Succeeded/Total)Tasks (for all stages): Succeeded/Total
>>>
>>> 11     count  2016/03/14 15:35:32  1.4
>>> min 164/164 * (163 skipped)   *19841/19788
>>> *(41405 skipped)*
>>> Thanks,
>>> Prabhu Joseph
>>>
>>
>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>
>


Spark UI Completed Jobs

2016-03-15 Thread Prabhu Joseph
Hi All,


Spark UI Completed Jobs section shows below information, what is the
skipped value shown for Stages and Tasks below.

Job_IDDescriptionSubmittedDuration
Stages (Succeeded/Total)Tasks (for all stages): Succeeded/Total

11 count  2016/03/14 15:35:32  1.4 min
164/164 * (163 skipped)   *19841/19788
*(41405 skipped)*
Thanks,
Prabhu Joseph


Re: Launch Spark shell using differnt python version

2016-03-15 Thread Prabhu Joseph
Hi Stuti,

 You can try local mode but not spark master or yarn mode if python-2.7
is not installed on all Spark Worker / NodeManager machines. To run with
master mode

   1. Check whether user is able to access python2.7
   2. Check if you have installed python-2.7 in all NodeManager machines /
Spark Worker machines and restarted

  Executor running inside Spark Worker is able to get the full path of
python2.7. But inside NodeManager, executor does not find the python2.7
even though the script is in PATH. To make NodeManager find the path, set
the full path of python-2.7 like below in pyspark script.

   DEFAULT_PYTHON="/ANACONDA/anaconda2/bin/python2.7"

Thanks,
Prabhu Joseph



On Tue, Mar 15, 2016 at 11:52 AM, Stuti Awasthi <stutiawas...@hcl.com>
wrote:

> Hi All,
>
>
>
> I have a Centos cluster (without any sudo permissions) which has by
> default Python 2.6. Now I have installed Python2.7 for my user account and
> did the changes in bashrc so that Python2.7 is picked up by default. Then I
> have set the following properties in bashrc inorder to launch spark shell
> using Python 2.7 but its not working.
>
>
>
> Bashrc details :
>
> alias python='/home/stuti/Python/bin/python2.7'
>
> alias python2='/home/stuti/Python/bin/python2.7'
>
> export PYSPARK_PYTHON=/home/stuti/Python/bin/python2.7
>
> export LD_LIBRARY_PATH=/home/stuti/Python/lib:$LD_LIBRARY_PATH
>
> export PATH=$HOME/bin:$PATH
>
>
>
> Also it is to be noted that Spark cluster is configured with different
> user account and I have not installed python2.7 on all the nodes in the
> cluster as I don’t have permission access.
>
> So is there any way that I can launch my spark shell using Python2.7.
>
> Please suggest
>
>
>
> Thanks 
>
> Stuti Awasthi
>
>
>
>
>
> ::DISCLAIMER::
>
> 
>
> The contents of this e-mail and any attachment(s) are confidential and
> intended for the named recipient(s) only.
> E-mail transmission is not guaranteed to be secure or error-free as
> information could be intercepted, corrupted,
> lost, destroyed, arrive late or incomplete, or may contain viruses in
> transmission. The e mail and its contents
> (with or without referred errors) shall therefore not attach any liability
> on the originator or HCL or its affiliates.
> Views or opinions, if any, presented in this email are solely those of the
> author and may not necessarily reflect the
> views or opinions of HCL or its affiliates. Any form of reproduction,
> dissemination, copying, disclosure, modification,
> distribution and / or publication of this message without the prior
> written consent of authorized representative of
> HCL is strictly prohibited. If you have received this email in error
> please delete it and notify the sender immediately.
> Before opening any email and/or attachments, please check them for viruses
> and other defects.
>
>
> 
>


Re: Hive Query on Spark fails with OOM

2016-03-14 Thread Prabhu Joseph
Michael,

Thanks for the recommendation. But can you share what are the
improvements made above Spark-1.2.1 and how which specifically handle the
issue that is observed here.



On Tue, Mar 15, 2016 at 12:03 AM, Jörn Franke <jornfra...@gmail.com> wrote:

> I am not sure about this. At least Hortonworks provides its distribution
> with Hive and Spark 1.6
>
> On 14 Mar 2016, at 09:25, Mich Talebzadeh <mich.talebza...@gmail.com>
> wrote:
>
> I think the only version of Spark that works OK with Hive (Hive on Spark
> engine) is version 1.3.1. I also get OOM from time to time and have to
> revert using MR
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 14 March 2016 at 08:06, Sabarish Sasidharan <
> sabarish.sasidha...@manthan.com> wrote:
>
>> Which version of Spark are you using? The configuration varies by version.
>>
>> Regards
>> Sab
>>
>> On Mon, Mar 14, 2016 at 10:53 AM, Prabhu Joseph <
>> prabhujose.ga...@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>> A Hive Join query which runs fine and faster in MapReduce takes lot of
>>> time with Spark and finally fails with OOM.
>>>
>>> *Query:  hivejoin.py*
>>>
>>> from pyspark import SparkContext, SparkConf
>>> from pyspark.sql import HiveContext
>>> conf = SparkConf().setAppName("Hive_Join")
>>> sc = SparkContext(conf=conf)
>>> hiveCtx = HiveContext(sc)
>>> hiveCtx.hql("INSERT OVERWRITE TABLE D select <80 columns> from A a INNER
>>> JOIN B b ON a.item_id = b.item_id LEFT JOIN C c ON c.instance_id =
>>> a.instance_id");
>>> results = hiveCtx.hql("SELECT COUNT(1) FROM D").collect()
>>> print results
>>>
>>>
>>> *Data Study:*
>>>
>>> Number of Rows:
>>>
>>> A table has 1002093508
>>> B table has5371668
>>> C table has  1000
>>>
>>> No Data Skewness:
>>>
>>> item_id in B is unique and A has multiple rows with same item_id, so
>>> after first INNER_JOIN the result set is same 1002093508 rows
>>>
>>> instance_id in C is unique and A has multiple rows with same instance_id
>>> (maximum count of number of rows with same instance_id is 250)
>>>
>>> Spark Job runs with 90 Executors each with 2cores and 6GB memory. YARN
>>> has allotted all the requested resource immediately and no other job is
>>> running on the
>>> cluster.
>>>
>>> spark.storage.memoryFraction 0.6
>>> spark.shuffle.memoryFraction 0.2
>>>
>>> Stage 2 - reads data from Hadoop, Tasks has NODE_LOCAL and shuffle write
>>> 500GB of intermediate data
>>>
>>> Stage 3 - does shuffle read of 500GB data, tasks has PROCESS_LOCAL and
>>> output of 400GB is shuffled
>>>
>>> Stage 4 - tasks fails with OOM on reading the shuffled output data when
>>> it reached 40GB data itself
>>>
>>> First of all, what kind of Hive queries when run on Spark gets a better
>>> performance than Mapreduce. And what are the hive queries that won't perform
>>> well in Spark.
>>>
>>> How to calculate the optimal Heap for Executor Memory and the number of
>>> executors for given input data size. We don't specify Spark Executors to
>>> cache any data. But how come Stage 3 tasks says PROCESS_LOCAL. Why Stage 4
>>> is failing immediately
>>> when it has just read 40GB data, is it caching data in Memory.
>>>
>>> And in a Spark job, some stage will need lot of memory for shuffle and
>>> some need lot of memory for cache. So, when a Spark Executor has lot of
>>> memory available
>>> for cache and does not use the cache but when there is a need to do lot
>>> of shuffle, will executors only use the shuffle fraction which is set for
>>> doing shuffle or will it use
>>> the free memory available for cache as well.
>>>
>>>
>>> Thanks,
>>> Prabhu Joseph
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>
>>
>> --
>>
>> Architect - Big Data
>> Ph: +91 99805 99458
>>
>> Manthan Systems | *Company of the year - Analytics (2014 Frost and
>> Sullivan India ICT)*
>> +++
>>
>
>


Re: Hive Query on Spark fails with OOM

2016-03-14 Thread Prabhu Joseph
The issue is the query hits OOM on a Stage when reading Shuffle Output from
previous stage.How come increasing shuffle memory helps to avoid OOM.

On Mon, Mar 14, 2016 at 2:28 PM, Sabarish Sasidharan <sabarish@gmail.com
> wrote:

> Thats a pretty old version of Spark SQL. It is devoid of all the
> improvements introduced in the last few releases.
>
> You should try bumping your spark.sql.shuffle.partitions to a value
> higher than default (5x or 10x). Also increase your shuffle memory fraction
> as you really are not explicitly caching anything. You could simply swap
> the fractions in your case.
>
> Regards
> Sab
>
> On Mon, Mar 14, 2016 at 2:20 PM, Prabhu Joseph <prabhujose.ga...@gmail.com
> > wrote:
>
>> It is a Spark-SQL and the version used is Spark-1.2.1.
>>
>> On Mon, Mar 14, 2016 at 2:16 PM, Sabarish Sasidharan <
>> sabarish.sasidha...@manthan.com> wrote:
>>
>>> I believe the OP is using Spark SQL and not Hive on Spark.
>>>
>>> Regards
>>> Sab
>>>
>>> On Mon, Mar 14, 2016 at 1:55 PM, Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
>>>> I think the only version of Spark that works OK with Hive (Hive on
>>>> Spark engine) is version 1.3.1. I also get OOM from time to time and have
>>>> to revert using MR
>>>>
>>>> Dr Mich Talebzadeh
>>>>
>>>>
>>>>
>>>> LinkedIn * 
>>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>
>>>>
>>>>
>>>> http://talebzadehmich.wordpress.com
>>>>
>>>>
>>>>
>>>> On 14 March 2016 at 08:06, Sabarish Sasidharan <
>>>> sabarish.sasidha...@manthan.com> wrote:
>>>>
>>>>> Which version of Spark are you using? The configuration varies by
>>>>> version.
>>>>>
>>>>> Regards
>>>>> Sab
>>>>>
>>>>> On Mon, Mar 14, 2016 at 10:53 AM, Prabhu Joseph <
>>>>> prabhujose.ga...@gmail.com> wrote:
>>>>>
>>>>>> Hi All,
>>>>>>
>>>>>> A Hive Join query which runs fine and faster in MapReduce takes lot
>>>>>> of time with Spark and finally fails with OOM.
>>>>>>
>>>>>> *Query:  hivejoin.py*
>>>>>>
>>>>>> from pyspark import SparkContext, SparkConf
>>>>>> from pyspark.sql import HiveContext
>>>>>> conf = SparkConf().setAppName("Hive_Join")
>>>>>> sc = SparkContext(conf=conf)
>>>>>> hiveCtx = HiveContext(sc)
>>>>>> hiveCtx.hql("INSERT OVERWRITE TABLE D select <80 columns> from A a
>>>>>> INNER JOIN B b ON a.item_id = b.item_id LEFT JOIN C c ON c.instance_id =
>>>>>> a.instance_id");
>>>>>> results = hiveCtx.hql("SELECT COUNT(1) FROM D").collect()
>>>>>> print results
>>>>>>
>>>>>>
>>>>>> *Data Study:*
>>>>>>
>>>>>> Number of Rows:
>>>>>>
>>>>>> A table has 1002093508
>>>>>> B table has5371668
>>>>>> C table has  1000
>>>>>>
>>>>>> No Data Skewness:
>>>>>>
>>>>>> item_id in B is unique and A has multiple rows with same item_id, so
>>>>>> after first INNER_JOIN the result set is same 1002093508 rows
>>>>>>
>>>>>> instance_id in C is unique and A has multiple rows with same
>>>>>> instance_id (maximum count of number of rows with same instance_id is 
>>>>>> 250)
>>>>>>
>>>>>> Spark Job runs with 90 Executors each with 2cores and 6GB memory.
>>>>>> YARN has allotted all the requested resource immediately and no other job
>>>>>> is running on the
>>>>>> cluster.
>>>>>>
>>>>>> spark.storage.memoryFraction 0.6
>>>>>> spark.shuffle.memoryFraction 0.2
>>>>>>
>>>>>> Stage 2 - reads data from Hadoop, Tasks has NODE_LOCAL and shuffle
>>>>>> write 500GB of intermediate data
>>>>>>
>>>>>> Stage 3 - does

Re: Hive Query on Spark fails with OOM

2016-03-14 Thread Prabhu Joseph
It is a Spark-SQL and the version used is Spark-1.2.1.

On Mon, Mar 14, 2016 at 2:16 PM, Sabarish Sasidharan <
sabarish.sasidha...@manthan.com> wrote:

> I believe the OP is using Spark SQL and not Hive on Spark.
>
> Regards
> Sab
>
> On Mon, Mar 14, 2016 at 1:55 PM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> I think the only version of Spark that works OK with Hive (Hive on Spark
>> engine) is version 1.3.1. I also get OOM from time to time and have to
>> revert using MR
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 14 March 2016 at 08:06, Sabarish Sasidharan <
>> sabarish.sasidha...@manthan.com> wrote:
>>
>>> Which version of Spark are you using? The configuration varies by
>>> version.
>>>
>>> Regards
>>> Sab
>>>
>>> On Mon, Mar 14, 2016 at 10:53 AM, Prabhu Joseph <
>>> prabhujose.ga...@gmail.com> wrote:
>>>
>>>> Hi All,
>>>>
>>>> A Hive Join query which runs fine and faster in MapReduce takes lot of
>>>> time with Spark and finally fails with OOM.
>>>>
>>>> *Query:  hivejoin.py*
>>>>
>>>> from pyspark import SparkContext, SparkConf
>>>> from pyspark.sql import HiveContext
>>>> conf = SparkConf().setAppName("Hive_Join")
>>>> sc = SparkContext(conf=conf)
>>>> hiveCtx = HiveContext(sc)
>>>> hiveCtx.hql("INSERT OVERWRITE TABLE D select <80 columns> from A a
>>>> INNER JOIN B b ON a.item_id = b.item_id LEFT JOIN C c ON c.instance_id =
>>>> a.instance_id");
>>>> results = hiveCtx.hql("SELECT COUNT(1) FROM D").collect()
>>>> print results
>>>>
>>>>
>>>> *Data Study:*
>>>>
>>>> Number of Rows:
>>>>
>>>> A table has 1002093508
>>>> B table has5371668
>>>> C table has  1000
>>>>
>>>> No Data Skewness:
>>>>
>>>> item_id in B is unique and A has multiple rows with same item_id, so
>>>> after first INNER_JOIN the result set is same 1002093508 rows
>>>>
>>>> instance_id in C is unique and A has multiple rows with same
>>>> instance_id (maximum count of number of rows with same instance_id is 250)
>>>>
>>>> Spark Job runs with 90 Executors each with 2cores and 6GB memory. YARN
>>>> has allotted all the requested resource immediately and no other job is
>>>> running on the
>>>> cluster.
>>>>
>>>> spark.storage.memoryFraction 0.6
>>>> spark.shuffle.memoryFraction 0.2
>>>>
>>>> Stage 2 - reads data from Hadoop, Tasks has NODE_LOCAL and shuffle
>>>> write 500GB of intermediate data
>>>>
>>>> Stage 3 - does shuffle read of 500GB data, tasks has PROCESS_LOCAL and
>>>> output of 400GB is shuffled
>>>>
>>>> Stage 4 - tasks fails with OOM on reading the shuffled output data when
>>>> it reached 40GB data itself
>>>>
>>>> First of all, what kind of Hive queries when run on Spark gets a better
>>>> performance than Mapreduce. And what are the hive queries that won't 
>>>> perform
>>>> well in Spark.
>>>>
>>>> How to calculate the optimal Heap for Executor Memory and the number of
>>>> executors for given input data size. We don't specify Spark Executors to
>>>> cache any data. But how come Stage 3 tasks says PROCESS_LOCAL. Why Stage 4
>>>> is failing immediately
>>>> when it has just read 40GB data, is it caching data in Memory.
>>>>
>>>> And in a Spark job, some stage will need lot of memory for shuffle and
>>>> some need lot of memory for cache. So, when a Spark Executor has lot of
>>>> memory available
>>>> for cache and does not use the cache but when there is a need to do lot
>>>> of shuffle, will executors only use the shuffle fraction which is set for
>>>> doing shuffle or will it use
>>>> the free memory available for cache as well.
>>>>
>>>>
>>>> Thanks,
>>>> Prabhu Joseph
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>>
>>> Architect - Big Data
>>> Ph: +91 99805 99458
>>>
>>> Manthan Systems | *Company of the year - Analytics (2014 Frost and
>>> Sullivan India ICT)*
>>> +++
>>>
>>
>>
>
>
> --
>
> Architect - Big Data
> Ph: +91 99805 99458
>
> Manthan Systems | *Company of the year - Analytics (2014 Frost and
> Sullivan India ICT)*
> +++
>


Hive Query on Spark fails with OOM

2016-03-13 Thread Prabhu Joseph
Hi All,

A Hive Join query which runs fine and faster in MapReduce takes lot of time
with Spark and finally fails with OOM.

*Query:  hivejoin.py*

from pyspark import SparkContext, SparkConf
from pyspark.sql import HiveContext
conf = SparkConf().setAppName("Hive_Join")
sc = SparkContext(conf=conf)
hiveCtx = HiveContext(sc)
hiveCtx.hql("INSERT OVERWRITE TABLE D select <80 columns> from A a INNER
JOIN B b ON a.item_id = b.item_id LEFT JOIN C c ON c.instance_id =
a.instance_id");
results = hiveCtx.hql("SELECT COUNT(1) FROM D").collect()
print results


*Data Study:*

Number of Rows:

A table has 1002093508
B table has5371668
C table has  1000

No Data Skewness:

item_id in B is unique and A has multiple rows with same item_id, so after
first INNER_JOIN the result set is same 1002093508 rows

instance_id in C is unique and A has multiple rows with same instance_id
(maximum count of number of rows with same instance_id is 250)

Spark Job runs with 90 Executors each with 2cores and 6GB memory. YARN has
allotted all the requested resource immediately and no other job is running
on the
cluster.

spark.storage.memoryFraction 0.6
spark.shuffle.memoryFraction 0.2

Stage 2 - reads data from Hadoop, Tasks has NODE_LOCAL and shuffle write
500GB of intermediate data

Stage 3 - does shuffle read of 500GB data, tasks has PROCESS_LOCAL and
output of 400GB is shuffled

Stage 4 - tasks fails with OOM on reading the shuffled output data when it
reached 40GB data itself

First of all, what kind of Hive queries when run on Spark gets a better
performance than Mapreduce. And what are the hive queries that won't perform
well in Spark.

How to calculate the optimal Heap for Executor Memory and the number of
executors for given input data size. We don't specify Spark Executors to
cache any data. But how come Stage 3 tasks says PROCESS_LOCAL. Why Stage 4
is failing immediately
when it has just read 40GB data, is it caching data in Memory.

And in a Spark job, some stage will need lot of memory for shuffle and some
need lot of memory for cache. So, when a Spark Executor has lot of memory
available
for cache and does not use the cache but when there is a need to do lot of
shuffle, will executors only use the shuffle fraction which is set for
doing shuffle or will it use
the free memory available for cache as well.


Thanks,
Prabhu Joseph


Re: NullPointerException

2016-03-11 Thread Prabhu Joseph
Looking at ExternalSorter.scala line 192, i suspect some input record has
Null key.

189  while (records.hasNext) {
190addElementsRead()
191kv = records.next()
192map.changeValue((getPartition(kv._1), kv._1), update)



On Sat, Mar 12, 2016 at 12:48 PM, Prabhu Joseph <prabhujose.ga...@gmail.com>
wrote:

> Looking at ExternalSorter.scala line 192
>
> 189
> while (records.hasNext) { addElementsRead() kv = records.next()
> map.changeValue((getPartition(kv._1), kv._1), update)
> maybeSpillCollection(usingMap = true) }
>
> On Sat, Mar 12, 2016 at 12:31 PM, Saurabh Guru <saurabh.g...@gmail.com>
> wrote:
>
>> I am seeing the following exception in my Spark Cluster every few days in
>> production.
>>
>> 2016-03-12 05:30:00,541 - WARN  TaskSetManager - Lost task 0.0 in stage
>> 12528.0 (TID 18792, ip-1X-1XX-1-1XX.us <http://ip-10-180-1-188.us>
>> -west-1.compute.internal
>> ): java.lang.NullPointerException
>>at
>> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:192)
>>at
>> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64)
>>at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>>at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>at org.apache.spark.scheduler.Task.run(Task.scala:89)
>>at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>at java.lang.Thread.run(Thread.java:745)
>>
>>
>> I have debugged in local machine but haven’t been able to pin point the
>> cause of the error. Anyone knows why this might occur? Any suggestions?
>>
>>
>> Thanks,
>> Saurabh
>>
>>
>>
>>
>


Re: NullPointerException

2016-03-11 Thread Prabhu Joseph
Looking at ExternalSorter.scala line 192

189
while (records.hasNext) { addElementsRead() kv = records.next()
map.changeValue((getPartition(kv._1), kv._1), update)
maybeSpillCollection(usingMap = true) }

On Sat, Mar 12, 2016 at 12:31 PM, Saurabh Guru 
wrote:

> I am seeing the following exception in my Spark Cluster every few days in
> production.
>
> 2016-03-12 05:30:00,541 - WARN  TaskSetManager - Lost task 0.0 in stage
> 12528.0 (TID 18792, ip-1X-1XX-1-1XX.us 
> -west-1.compute.internal
> ): java.lang.NullPointerException
>at
> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:192)
>at
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64)
>at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>at org.apache.spark.scheduler.Task.run(Task.scala:89)
>at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>at java.lang.Thread.run(Thread.java:745)
>
>
> I have debugged in local machine but haven’t been able to pin point the
> cause of the error. Anyone knows why this might occur? Any suggestions?
>
>
> Thanks,
> Saurabh
>
>
>
>


Re: Spark configuration with 5 nodes

2016-03-10 Thread Prabhu Joseph
Ashok,

   Cluster nodes has enough memory but CPU cores are less.  512GB / 16 = 32
GB. For 1 core the cluster has 32GB memory. Either their should be more
cores available to use efficiently the
available memory or don't configure a higher executor memory which will
cause lot of GC.

Thanks,
Prabhu Joseph

On Fri, Mar 11, 2016 at 3:45 AM, Ashok Kumar <ashok34...@yahoo.com.invalid>
wrote:

>
> Hi,
>
> We intend  to use 5 servers which will be utilized for building Bigdata
> Hadoop data warehouse system (not using any propriety distribution like
> Hortonworks or Cloudera or others).
> All servers configurations are 512GB RAM, 30TB storage and 16 cores,
> Ubuntu Linux servers. Hadoop will be installed on all the servers/nodes.
> Server 1 will be used for NameNode plus DataNode as well. Server 2 will be
> used for standby NameNode & DataNode. The rest of the servers will be used
> as DataNodes..
> Now we would like to install Spark on each servers to create Spark
> cluster. Is that the good thing to do or we should buy additional hardware
> for Spark (minding cost here) or simply do we require additional memory to
> accommodate Spark as well please. In that case how much memory for each
> Spark node would you recommend?
>
>
> thanks all
>


Re: Spark Scheduler creating Straggler Node

2016-03-08 Thread Prabhu Joseph
I don't just want to replicate all Cached Blocks. I am trying to find a way
to solve the issue which i mentioned above mail. Having replicas for all
cached blocks will add more cost to customers.



On Wed, Mar 9, 2016 at 9:50 AM, Reynold Xin <r...@databricks.com> wrote:

> You just want to be able to replicate hot cached blocks right?
>
>
> On Tuesday, March 8, 2016, Prabhu Joseph <prabhujose.ga...@gmail.com>
> wrote:
>
>> Hi All,
>>
>> When a Spark Job is running, and one of the Spark Executor on Node A
>> has some partitions cached. Later for some other stage, Scheduler tries to
>> assign a task to Node A to process a cached partition (PROCESS_LOCAL). But
>> meanwhile the Node A is occupied with some other
>> tasks and got busy. Scheduler waits for spark.locality.wait interval and
>> times out and tries to find some other node B which is NODE_LOCAL. The
>> executor on Node B will try to get the cached partition from Node A which
>> adds network IO to node and also some extra CPU for I/O. Eventually,
>> every node will have a task that is waiting to fetch some cached
>> partition from node A and so the spark job / cluster is basically blocked
>> on a single node.
>>
>> Spark JIRA is created https://issues.apache.org/jira/browse/SPARK-13718
>>
>> Beginning from Spark 1.2, Spark introduced External Shuffle Service to
>> enable executors fetch shuffle files from an external service instead of
>> from each other which will offload the load on Spark Executors.
>>
>> We want to check whether a similar thing of an External Service is
>> implemented for transferring the cached partition to other executors.
>>
>>
>> Thanks, Prabhu Joseph
>>
>>
>>


Spark Scheduler creating Straggler Node

2016-03-08 Thread Prabhu Joseph
Hi All,

When a Spark Job is running, and one of the Spark Executor on Node A
has some partitions cached. Later for some other stage, Scheduler tries to
assign a task to Node A to process a cached partition (PROCESS_LOCAL). But
meanwhile the Node A is occupied with some other
tasks and got busy. Scheduler waits for spark.locality.wait interval and
times out and tries to find some other node B which is NODE_LOCAL. The
executor on Node B will try to get the cached partition from Node A which
adds network IO to node and also some extra CPU for I/O. Eventually,
every node will have a task that is waiting to fetch some cached partition
from node A and so the spark job / cluster is basically blocked on a single
node.

Spark JIRA is created https://issues.apache.org/jira/browse/SPARK-13718

Beginning from Spark 1.2, Spark introduced External Shuffle Service to
enable executors fetch shuffle files from an external service instead of
from each other which will offload the load on Spark Executors.

We want to check whether a similar thing of an External Service is
implemented for transferring the cached partition to other executors.


Thanks, Prabhu Joseph


Spark Partitioner vs Spark Shuffle Manager

2016-03-07 Thread Prabhu Joseph
Hi All,

   What is the difference between Spark Partitioner and Spark Shuffle
Manager. Spark Partitioner is by default Hash partitioner and Spark shuffle
manager is sort based, others are Hash, Tunsten Sort.

Thanks,
Prabhu Joseph


Spark Custom Partitioner not picked

2016-03-06 Thread Prabhu Joseph
Hi All,

When i am submitting a spark job on YARN with Custom Partitioner, it is
not picked by Executors. Executors still using the default HashPartitioner.
I added logs into both HashPartitioner (org/apache/spark/Partitioner.scala)
and Custom Partitioner. The completed executor logs shows HashPartitioner.

Below is the Spark application code with Custom Partitioner and the log
line which is added into HashPartitioner class of Partition.scala

 
log.info("HashPartitioner="+key+"---"+numPartitions+""+Utils.nonNegativeMod(key.hashCode,
numPartitions))

The Executor logs has

16/03/06 15:20:27 INFO spark.HashPartitioner: HashPartitioner=INFO---42
16/03/06 15:20:27 INFO spark.HashPartitioner: HashPartitioner=INFO---42


How to make sure, the executors are picking the right partitioner.



*Code:*
package org.apache.spark

class ExactPartitioner(partitions: Int) extends Partitioner with Logging{

  def numPartitions: Int = partitions

  def getPartition(key: Any): Int = {

*   log.info <http://log.info>("ExactPartitioner="+key)*

   key match{
   case "INFO" => 0
   case "DEBUG" => 1
   case "ERROR" => 2
   case "WARN" => 3
   case "FATAL" => 4
   }
  }
}

object GroupByCLDB {

def main(args: Array[String]) {

val logFile = "/DATA"

val sparkConf = new SparkConf().setAppName("GroupBy")
sparkConf.set("spark.executor.memory","4g");
sparkConf.set("spark.executor.cores","2");
sparkConf.set("spark.executor.instances","2");

val sc = new SparkContext(sparkConf)
val logData = sc.textFile(logFile)


case class LogClass(one:String,two:String)

def parse(line: String) = {
  val pieces = line.split(' ')
  val level = pieces(2).toString
  val one = pieces(0).toString
  val two = pieces(1).toString
  (level,LogClass(one,two))
  }

val output = logData.map(x => parse(x))

*val partitioned = output.partitionBy(new ExactPartitioner(5)).persist()val
groups = partitioned.groupByKey(new ExactPartitioner(5))*
groups.count()

output.partitions.size
partitioned.partitions.size

}
}



Thanks,
Prabhu Joseph


Re: Spark on Yarn with Dynamic Resource Allocation. Container always marked as failed

2016-03-02 Thread Prabhu Joseph
Is all NodeManager services restarted after the change in yarn-site.xml

On Thu, Mar 3, 2016 at 6:00 AM, Jeff Zhang  wrote:

> The executor may fail to start. You need to check the executor logs, if
> there's no executor log then you need to check node manager log.
>
> On Wed, Mar 2, 2016 at 4:26 PM, Xiaoye Sun  wrote:
>
>> Hi all,
>>
>> I am very new to spark and yarn.
>>
>> I am running a BroadcastTest example application using spark 1.6.0 and
>> Hadoop/Yarn 2.7.1. in a 5 nodes cluster.
>>
>> I configured my configuration files according to
>> https://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation
>>
>> 1. copy
>> ./spark-1.6.0/network/yarn/target/scala-2.10/spark-1.6.0-yarn-shuffle.jar
>> to /hadoop-2.7.1/share/hadoop/yarn/lib/
>> 2. yarn-site.xml is like this
>> http://www.owlnet.rice.edu/~xs6/yarn-site.xml
>> 3. spark-defaults.conf is like this
>> http://www.owlnet.rice.edu/~xs6/spark-defaults.conf
>> 4. spark-env.sh is like this http://www.owlnet.rice.edu/~xs6/spark-env.sh
>> 5. the command I use to submit spark application is: ./bin/spark-submit
>> --class org.apache.spark.examples.BroadcastTest --master yarn --deploy-mode
>> cluster ./examples/target/spark-examples_2.10-1.6.0.jar 1 1000 Http
>>
>> However, the job is stuck at RUNNING status, and by looking at the log, I
>> found that the executor is failed/cancelled frequently...
>> Here is the log output http://www.owlnet.rice.edu/~xs6/stderr
>> It shows something like
>>
>> 16/03/02 02:07:35 WARN yarn.YarnAllocator: Container marked as failed: 
>> container_1456905762620_0002_01_02 on host: bold-x.rice.edu. Exit 
>> status: 1. Diagnostics: Exception from container-launch.
>>
>>
>> Is there anybody know what is the problem here?
>> Best,
>> Xiaoye
>>
>
>
>
> --
> Best Regards
>
> Jeff Zhang
>


Spark job on YARN ApplicationMaster DEBUG log

2016-03-02 Thread Prabhu Joseph
Hi All,

I am trying to add DEBUG for Spark ApplicationMaster for it is not working.

On running Spark job, passed

-Dlog4j.configuration=file:/opt/mapr/spark/spark-1.4.1/conf/log4j.properties

The log4j.properties has log4j.rootCategory=DEBUG, console

Spark Executor Containers has DEBUG logs but not the ApplicationMaster
container.


Re: Add Jars to Master/Worker classpath

2016-03-02 Thread Prabhu Joseph
Matthias,

 Can you check appending the jars in LAUNCH_CLASSPATH of
spark-1.4.1/sbin/spark_class

2016-03-02 21:39 GMT+05:30 Matthias Niehoff :

> no, not to driver and executor but to the master and worker instances of
> the spark standalone cluster
>
> Am 2. März 2016 um 17:05 schrieb Igor Berman :
>
>> spark.driver.extraClassPath
>> spark.executor.extraClassPath
>>
>> 2016-03-02 18:01 GMT+02:00 Matthias Niehoff <
>> matthias.nieh...@codecentric.de>:
>>
>>> Hi,
>>>
>>> we want to add jars to the Master and Worker class path mainly for
>>> logging reason (we have a redis appender to send logs to redis -> logstash
>>> -> elasticsearch).
>>>
>>> While it is working with setting SPARK_CLASSPATH, this solution is
>>> afaik deprecated and should not be used. Furthermore we are also using 
>>> —driver-java-options
>>> and spark.executor.extraClassPath which leads to exceptions when
>>> running our apps in standalone cluster mode.
>>>
>>> So what is the best way to add jars to the master and worker classpath?
>>>
>>> Thank you
>>>
>>> --
>>> Matthias Niehoff | IT-Consultant | Agile Software Factory  | Consulting
>>> codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
>>> tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0)
>>> 172.1702676
>>> www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
>>> www.more4fi.de
>>>
>>> Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
>>> Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
>>> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen
>>> Schütz
>>>
>>> Diese E-Mail einschließlich evtl. beigefügter Dateien enthält
>>> vertrauliche und/oder rechtlich geschützte Informationen. Wenn Sie nicht
>>> der richtige Adressat sind oder diese E-Mail irrtümlich erhalten haben,
>>> informieren Sie bitte sofort den Absender und löschen Sie diese E-Mail und
>>> evtl. beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder
>>> Öffnen evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser
>>> E-Mail ist nicht gestattet
>>>
>>
>>
>
>
> --
> Matthias Niehoff | IT-Consultant | Agile Software Factory  | Consulting
> codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
> tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0)
> 172.1702676
> www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
> www.more4fi.de
>
> Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
> Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz
>
> Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche
> und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige
> Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie
> bitte sofort den Absender und löschen Sie diese E-Mail und evtl.
> beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen
> evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist
> nicht gestattet
>


Re: Concurreny does not improve for Spark Jobs with Same Spark Context

2016-02-18 Thread Prabhu Joseph
Fair Scheduler, YARN Queue has the entire cluster resource as maxResource,
preemption does not come into picture during test case, all the spark jobs
got the requested resource.

The concurrent jobs with different spark context runs fine, so suspecting
on resource contention is not a correct one.

The performace degrades only for concurrent jobs on shared spark context.
Is SparkContext has any critical section, which needs locking, and jobs
waiting to read that. I know Spark and Scala is not a old thread model, it
uses Actor Model, where locking does not happen, but still want to verify
is java old  threading is used somewhere.



On Friday, February 19, 2016, Jörn Franke <jornfra...@gmail.com> wrote:

> How did you configure YARN queues? What scheduler? Preemption ?
>
> > On 19 Feb 2016, at 06:51, Prabhu Joseph <prabhujose.ga...@gmail.com
> <javascript:;>> wrote:
> >
> > Hi All,
> >
> >When running concurrent Spark Jobs on YARN (Spark-1.5.2) which share
> a single Spark Context, the jobs take more time to complete comparing with
> when they ran with different Spark Context.
> > The spark jobs are submitted on different threads.
> >
> > Test Case:
> >
> > A.  3 spark jobs submitted serially
> > B.  3 spark jobs submitted concurrently and with different
> SparkContext
> > C.  3 spark jobs submitted concurrently and with same Spark Context
> > D.  3 spark jobs submitted concurrently and with same Spark Context
> and tripling the resources.
> >
> > A and B takes equal time, But C and D are taking 2-3 times longer than
> A, which shows concurrency does not improve with shared Spark Context.
> [Spark Job Server]
> >
> > Thanks,
> > Prabhu Joseph
>


Concurreny does not improve for Spark Jobs with Same Spark Context

2016-02-18 Thread Prabhu Joseph
Hi All,

   When running concurrent Spark Jobs on YARN (Spark-1.5.2) which share a
single Spark Context, the jobs take more time to complete comparing with
when they ran with different Spark Context.
The spark jobs are submitted on different threads.

Test Case:

A.  3 spark jobs submitted serially
B.  3 spark jobs submitted concurrently and with different SparkContext
C.  3 spark jobs submitted concurrently and with same Spark Context
D.  3 spark jobs submitted concurrently and with same Spark Context and
tripling the resources.

A and B takes equal time, But C and D are taking 2-3 times longer than A,
which shows concurrency does not improve with shared Spark Context. [Spark
Job Server]

Thanks,
Prabhu Joseph


Re: Creating HiveContext in Spark-Shell fails

2016-02-15 Thread Prabhu Joseph
Thanks Mark, that answers my question.

On Tue, Feb 16, 2016 at 10:55 AM, Mark Hamstra <m...@clearstorydata.com>
wrote:

> Welcome to
>
>     __
>
>  / __/__  ___ _/ /__
>
> _\ \/ _ \/ _ `/ __/  '_/
>
>/___/ .__/\_,_/_/ /_/\_\   version 2.0.0-SNAPSHOT
>
>   /_/
>
>
>
> Using Scala version 2.11.7 (Java HotSpot(TM) 64-Bit Server VM, Java
> 1.8.0_72)
>
> Type in expressions to have them evaluated.
>
> Type :help for more information.
>
>
> scala> sqlContext.isInstanceOf[org.apache.spark.sql.hive.HiveContext]
>
> res0: Boolean = true
>
>
>
> On Mon, Feb 15, 2016 at 8:51 PM, Prabhu Joseph <prabhujose.ga...@gmail.com
> > wrote:
>
>> Hi All,
>>
>> On creating HiveContext in spark-shell, fails with
>>
>> Caused by: ERROR XSDB6: Another instance of Derby may have already booted
>> the database /SPARK/metastore_db.
>>
>> Spark-Shell already has created metastore_db for SqlContext.
>>
>> Spark context available as sc.
>> SQL context available as sqlContext.
>>
>> But without HiveContext, i am able to query the data using SqlContext .
>>
>> scala>  var df =
>> sqlContext.read.format("com.databricks.spark.csv").option("header",
>> "true").option("inferSchema", "true").load("/SPARK/abc")
>> df: org.apache.spark.sql.DataFrame = [Prabhu: string, Joseph: string]
>>
>> So is there any real need for HiveContext inside Spark Shell. Is
>> everything that can be done with HiveContext, achievable with SqlContext
>> inside Spark Shell.
>>
>>
>>
>> Thanks,
>> Prabhu Joseph
>>
>>
>>
>>
>>
>


Creating HiveContext in Spark-Shell fails

2016-02-15 Thread Prabhu Joseph
Hi All,

On creating HiveContext in spark-shell, fails with

Caused by: ERROR XSDB6: Another instance of Derby may have already booted
the database /SPARK/metastore_db.

Spark-Shell already has created metastore_db for SqlContext.

Spark context available as sc.
SQL context available as sqlContext.

But without HiveContext, i am able to query the data using SqlContext .

scala>  var df =
sqlContext.read.format("com.databricks.spark.csv").option("header",
"true").option("inferSchema", "true").load("/SPARK/abc")
df: org.apache.spark.sql.DataFrame = [Prabhu: string, Joseph: string]

So is there any real need for HiveContext inside Spark Shell. Is everything
that can be done with HiveContext, achievable with SqlContext inside Spark
Shell.



Thanks,
Prabhu Joseph


Re: Spark worker abruptly dying after 2 days

2016-02-14 Thread Prabhu Joseph
Kartik,

 Spark Workers won't start if SPARK_MASTER_IP is wrong, maybe you would
have used start_slaves.sh from Master node to start all worker nodes, where
Workers would have got correct SPARK_MASTER_IP initially. Later any restart
from slave nodes would have failed because of wrong SPARK_MASTER_IP at
worker nodes.

   Check the logs of other workers running to see what SPARK_MASTER_IP it
has connected, I don't think it is using a wrong Master IP.


Thanks,
Prabhu Joseph

On Mon, Feb 15, 2016 at 12:34 PM, Kartik Mathur <kar...@bluedata.com> wrote:

> Thanks Prabhu ,
>
> I had wrongly configured spark_master_ip in worker nodes to `hostname -f`
> which is the worker and not master ,
>
> but now the question is *why the cluster was up initially for 2 days* and
> workers realized of this invalid configuration after 2 days ? And why other
> workers are still up even through they have the same setting ?
>
> Really appreciate your help
>
> Thanks,
> Kartik
>
> On Sun, Feb 14, 2016 at 10:53 PM, Prabhu Joseph <
> prabhujose.ga...@gmail.com> wrote:
>
>> Kartik,
>>
>>The exception stack trace
>> *java.util.concurrent.RejectedExecutionException* will happen if
>> SPARK_MASTER_IP in worker nodes are configured wrongly like if
>> SPARK_MASTER_IP is a hostname of Master Node and workers trying to connect
>> to IP of master node. Check whether SPARK_MASTER_IP in Worker nodes are
>> exactly the same as what Spark Master GUI shows.
>>
>>
>> Thanks,
>> Prabhu Joseph
>>
>> On Mon, Feb 15, 2016 at 11:51 AM, Kartik Mathur <kar...@bluedata.com>
>> wrote:
>>
>>> on spark 1.5.2
>>> I have a spark standalone cluster with 6 workers , I left the cluster
>>> idle for 3 days and after 3 days I saw only 4 workers on the spark master
>>> UI , 2 workers died with the same exception -
>>>
>>> Strange part is cluster was running stable for 2 days but on third day 2
>>> workers abruptly died . I am see this error in one of the affected worker .
>>> No job ran for 2 days.
>>>
>>>
>>>
>>> 2016-02-14 01:12:59 ERROR Worker:75 - Connection to master failed!
>>> Waiting for master to reconnect...2016-02-14 01:12:59 ERROR Worker:75 -
>>> Connection to master failed! Waiting for master to reconnect...2016-02-14
>>> 01:13:10 ERROR SparkUncaughtExceptionHandler:96 - Uncaught exception in
>>> thread
>>> Thread[sparkWorker-akka.actor.default-dispatcher-2,5,main]java.util.concurrent.RejectedExecutionException:
>>> Task java.util.concurrent.FutureTask@514b13ad rejected from
>>> java.util.concurrent.ThreadPoolExecutor@17f8ec8d[Running, pool size =
>>> 1, active threads = 1, queued tasks = 0, completed tasks = 3]at
>>> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2048)
>>>at
>>> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:821)
>>>at
>>> java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1372)
>>>at
>>> java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:110)
>>>at
>>> org.apache.spark.deploy.worker.Worker$$anonfun$org$apache$spark$deploy$worker$Worker$$reregisterWithMaster$1.apply$mcV$sp(Worker.scala:269)
>>>at org.apache.spark.util.Utils$.tryOrExit(Utils.scala:1119)
>>>  at 
>>> org.apache.spark.deploy.worker.Worker.org$apache$spark$deploy$worker$Worker$$reregisterWithMaster(Worker.scala:234)
>>>at
>>> org.apache.spark.deploy.worker.Worker$$anonfun$receive$1.applyOrElse(Worker.scala:521)
>>>at 
>>> org.apache.spark.rpc.akka.AkkaRpcEnv.org$apache$spark$rpc$akka$AkkaRpcEnv$$processMessage(AkkaRpcEnv.scala:177)
>>>at
>>> org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$4.apply$mcV$sp(AkkaRpcEnv.scala:126)
>>>at 
>>> org.apache.spark.rpc.akka.AkkaRpcEnv.org$apache$spark$rpc$akka$AkkaRpcEnv$$safelyCall(AkkaRpcEnv.scala:197)
>>>at
>>> org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1.applyOrElse(AkkaRpcEnv.scala:125)
>>>at
>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>>>at
>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>>>at
>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(Abs

Re: Spark worker abruptly dying after 2 days

2016-02-14 Thread Prabhu Joseph
Kartik,

   The exception stack trace
*java.util.concurrent.RejectedExecutionException* will happen if
SPARK_MASTER_IP in worker nodes are configured wrongly like if
SPARK_MASTER_IP is a hostname of Master Node and workers trying to connect
to IP of master node. Check whether SPARK_MASTER_IP in Worker nodes are
exactly the same as what Spark Master GUI shows.


Thanks,
Prabhu Joseph

On Mon, Feb 15, 2016 at 11:51 AM, Kartik Mathur <kar...@bluedata.com> wrote:

> on spark 1.5.2
> I have a spark standalone cluster with 6 workers , I left the cluster idle
> for 3 days and after 3 days I saw only 4 workers on the spark master UI , 2
> workers died with the same exception -
>
> Strange part is cluster was running stable for 2 days but on third day 2
> workers abruptly died . I am see this error in one of the affected worker .
> No job ran for 2 days.
>
>
>
> 2016-02-14 01:12:59 ERROR Worker:75 - Connection to master failed! Waiting
> for master to reconnect...2016-02-14 01:12:59 ERROR Worker:75 - Connection
> to master failed! Waiting for master to reconnect...2016-02-14 01:13:10
> ERROR SparkUncaughtExceptionHandler:96 - Uncaught exception in thread
> Thread[sparkWorker-akka.actor.default-dispatcher-2,5,main]java.util.concurrent.RejectedExecutionException:
> Task java.util.concurrent.FutureTask@514b13ad rejected from
> java.util.concurrent.ThreadPoolExecutor@17f8ec8d[Running, pool size = 1,
> active threads = 1, queued tasks = 0, completed tasks = 3]at
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2048)
>at
> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:821)
>at
> java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1372)
>at
> java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:110)
>at
> org.apache.spark.deploy.worker.Worker$$anonfun$org$apache$spark$deploy$worker$Worker$$reregisterWithMaster$1.apply$mcV$sp(Worker.scala:269)
>at org.apache.spark.util.Utils$.tryOrExit(Utils.scala:1119)
>  at 
> org.apache.spark.deploy.worker.Worker.org$apache$spark$deploy$worker$Worker$$reregisterWithMaster(Worker.scala:234)
>at
> org.apache.spark.deploy.worker.Worker$$anonfun$receive$1.applyOrElse(Worker.scala:521)
>at 
> org.apache.spark.rpc.akka.AkkaRpcEnv.org$apache$spark$rpc$akka$AkkaRpcEnv$$processMessage(AkkaRpcEnv.scala:177)
>at
> org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$4.apply$mcV$sp(AkkaRpcEnv.scala:126)
>at 
> org.apache.spark.rpc.akka.AkkaRpcEnv.org$apache$spark$rpc$akka$AkkaRpcEnv$$safelyCall(AkkaRpcEnv.scala:197)
>at
> org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1.applyOrElse(AkkaRpcEnv.scala:125)
>at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>at
> org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:59)
>at
> org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42)
>at
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>  at
> org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42)
>at akka.actor.Actor$class.aroundReceive(Actor.scala:467)at
> org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1.aroundReceive(AkkaRpcEnv.scala:92)
>at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>  at akka.actor.ActorCell.invoke(ActorCell.scala:487)at
> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)at
> akka.dispatch.Mailbox.run(Mailbox.scala:220)at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>  at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>  at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
>
> down votefavorite
> <http://t.sidekickopen35.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nMJW7t5XZs4WrRx6W4XyGfn7gbDClW5vMqt056dBqBf8x44FH02?t=http%3A%2F%2Fstackoverflow.com%2Fquestions%2F35402516%2Fspark-workers-dropping-off-after-couple-of-days%23=5102319033384960=a5b195e6-0a48-4ec8-80a6-176be5a0ebe5>
>
>


Re: Spark Job on YARN accessing Hbase Table

2016-02-10 Thread Prabhu Joseph
Yes Ted, spark.executor.extraClassPath will work if hbase client jars is
present in all Spark Worker / NodeManager machines.

spark.yarn.dist.files is the easier way, as hbase client jars can be copied
from driver machine or hdfs into container / spark-executor classpath
automatically. No need to manually copy hbase client jars into
spark.executor.extraClassPath of all Worker / NodeManager nodes.

 spark.yarn.dist.files includes the jars from driver machine or hdfs into
container / spark executor classpath, but launch-container.sh does not
include the CWD/* of container into the classpath in hadoop-2.5.1 and hence
spark.yarn.dist.files does not work with hadoop-2.5.1,
spark.yarn.dist.files works fine on hadoop-2.7.0, as CWD/* is included in
container classpath through some bug fix. Searching for the JIRA.

Thanks,
Prabhu Joseph



On Wed, Feb 10, 2016 at 4:04 PM, Ted Yu <yuzhih...@gmail.com> wrote:

> Have you tried adding hbase client jars to spark.executor.extraClassPath ?
>
> Cheers
>
> On Wed, Feb 10, 2016 at 12:17 AM, Prabhu Joseph <
> prabhujose.ga...@gmail.com> wrote:
>
>> + Spark-Dev
>>
>> For a Spark job on YARN accessing hbase table, added all hbase client
>> jars into spark.yarn.dist.files, NodeManager when launching container i.e
>> executor, does localization and brings all hbase-client jars into executor
>> CWD, but still the executor tasks fail with ClassNotFoundException of hbase
>> client jars, when i checked launch container.sh , Classpath does not have
>> $PWD/* and hence all the hbase client jars are ignored.
>>
>> Is spark.yarn.dist.files not for adding jars into the executor classpath.
>>
>> Thanks,
>> Prabhu Joseph
>>
>> On Tue, Feb 9, 2016 at 1:42 PM, Prabhu Joseph <prabhujose.ga...@gmail.com
>> > wrote:
>>
>>> Hi All,
>>>
>>>  When i do count on a Hbase table from Spark Shell which runs as
>>> yarn-client mode, the job fails at count().
>>>
>>> MASTER=yarn-client ./spark-shell
>>>
>>> import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor,
>>> TableName}
>>> import org.apache.hadoop.hbase.client.HBaseAdmin
>>> import org.apache.hadoop.hbase.mapreduce.TableInputFormat
>>>
>>> val conf = HBaseConfiguration.create()
>>> conf.set(TableInputFormat.INPUT_TABLE,"spark")
>>>
>>> val hBaseRDD = sc.newAPIHadoopRDD(conf,
>>> classOf[TableInputFormat],classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result])
>>> hBaseRDD.count()
>>>
>>>
>>> Tasks throw below exception, the actual exception is swallowed, a bug
>>> JDK-7172206. After installing hbase client on all NodeManager machines, the
>>> Spark job ran fine. So I confirmed that the issue is with executor
>>> classpath.
>>>
>>> But i am searching for some other way of including hbase jars in spark
>>> executor classpath instead of installing hbase client on all NM machines.
>>> Tried adding all hbase jars in spark.yarn.dist.files , NM logs shows that
>>> it localized all hbase jars, still the job fails. Tried
>>> spark.executor.extraClasspath, still the job fails.
>>>
>>> Is there any way we can access hbase from Executor without installing
>>> hbase-client on all machines.
>>>
>>>
>>> 16/02/09 02:34:57 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID
>>> 0, prabhuFS1): *java.lang.IllegalStateException: unread block data*
>>> at
>>> java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2428)
>>> at
>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1382)
>>> at
>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>>> at
>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>>> at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>> at
>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>> at
>>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>>> at
>>> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:68)
>>> at
>>> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:94)
>>> at
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:185)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>>
>>>
>>> Thanks,
>>> Prabhu Joseph
>>>
>>
>>
>


Re: Spark Job on YARN accessing Hbase Table

2016-02-10 Thread Prabhu Joseph
+ Spark-Dev

For a Spark job on YARN accessing hbase table, added all hbase client jars
into spark.yarn.dist.files, NodeManager when launching container i.e
executor, does localization and brings all hbase-client jars into executor
CWD, but still the executor tasks fail with ClassNotFoundException of hbase
client jars, when i checked launch container.sh , Classpath does not have
$PWD/* and hence all the hbase client jars are ignored.

Is spark.yarn.dist.files not for adding jars into the executor classpath.

Thanks,
Prabhu Joseph

On Tue, Feb 9, 2016 at 1:42 PM, Prabhu Joseph <prabhujose.ga...@gmail.com>
wrote:

> Hi All,
>
>  When i do count on a Hbase table from Spark Shell which runs as
> yarn-client mode, the job fails at count().
>
> MASTER=yarn-client ./spark-shell
>
> import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor,
> TableName}
> import org.apache.hadoop.hbase.client.HBaseAdmin
> import org.apache.hadoop.hbase.mapreduce.TableInputFormat
>
> val conf = HBaseConfiguration.create()
> conf.set(TableInputFormat.INPUT_TABLE,"spark")
>
> val hBaseRDD = sc.newAPIHadoopRDD(conf,
> classOf[TableInputFormat],classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result])
> hBaseRDD.count()
>
>
> Tasks throw below exception, the actual exception is swallowed, a bug
> JDK-7172206. After installing hbase client on all NodeManager machines, the
> Spark job ran fine. So I confirmed that the issue is with executor
> classpath.
>
> But i am searching for some other way of including hbase jars in spark
> executor classpath instead of installing hbase client on all NM machines.
> Tried adding all hbase jars in spark.yarn.dist.files , NM logs shows that
> it localized all hbase jars, still the job fails. Tried
> spark.executor.extraClasspath, still the job fails.
>
> Is there any way we can access hbase from Executor without installing
> hbase-client on all machines.
>
>
> 16/02/09 02:34:57 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0,
> prabhuFS1): *java.lang.IllegalStateException: unread block data*
> at
> java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2428)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1382)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> at
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:68)
> at
> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:94)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:185)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
>
>
>
> Thanks,
> Prabhu Joseph
>


Long running Spark job on YARN throws "No AMRMToken"

2016-02-08 Thread Prabhu Joseph
Hi All,

A long running Spark job on YARN throws below exception after running
for few days.

yarn.ApplicationMaster: Reporter thread fails 1 time(s) in a row.
org.apache.hadoop.yarn.exceptions.YarnException: *No AMRMToken found* for
user prabhu at org.apache.hadoop.yarn.ipc.RPCUtil.getRemoteException(RPCUti
l.java:45)

Do any of the below renew the AMRMToken and solve the issue

1. yarn-resourcemanager.delegation.token.max-lifetime increase from 7 days

2. Configuring Proxy user:

 hadoop.proxyuser.yarn.hosts *

 hadoop.proxyuser.yarn.groups *


3. Can Spark-1.4.0 handle with fix
https://issues.apache.org/jira/browse/SPARK-5342

spark.yarn.credentials.file


How to renew the AMRMToken for a long running job on YARN?


Thanks,
Prabhu Joseph


Re: Long running Spark job on YARN throws "No AMRMToken"

2016-02-08 Thread Prabhu Joseph
+ Spark-Dev

On Tue, Feb 9, 2016 at 10:04 AM, Prabhu Joseph <prabhujose.ga...@gmail.com>
wrote:

> Hi All,
>
> A long running Spark job on YARN throws below exception after running
> for few days.
>
> yarn.ApplicationMaster: Reporter thread fails 1 time(s) in a row.
> org.apache.hadoop.yarn.exceptions.YarnException: *No AMRMToken found* for
> user prabhu at org.apache.hadoop.yarn.ipc.RPC
> Util.getRemoteException(RPCUtil.java:45)
>
> Do any of the below renew the AMRMToken and solve the issue
>
> 1. yarn-resourcemanager.delegation.token.max-lifetime increase from 7 days
>
> 2. Configuring Proxy user:
>
>  hadoop.proxyuser.yarn.hosts *
> 
>  hadoop.proxyuser.yarn.groups *
> 
>
> 3. Can Spark-1.4.0 handle with fix
> https://issues.apache.org/jira/browse/SPARK-5342
>
> spark.yarn.credentials.file
>
>
> How to renew the AMRMToken for a long running job on YARN?
>
>
> Thanks,
> Prabhu Joseph
>
>
>
>
>


Re: Spark job does not perform well when some RDD in memory and some on Disk

2016-02-04 Thread Prabhu Joseph
Okay, the reason for the task delay within executor when some RDD in memory
and some in Hadoop i.e, Multiple Locality Levels NODE_LOCAL and ANY, in
this case Scheduler waits
for *spark.locality.wait *3 seconds default. During this period, scheduler
waits to launch a data-local task before giving up and launching it on a
less-local node.

So after making it 0, all tasks started parallel. But learned that it is
better not to reduce it to 0.


On Mon, Feb 1, 2016 at 2:02 PM, Prabhu Joseph <prabhujose.ga...@gmail.com>
wrote:

> Hi All,
>
>
> Sample Spark application which reads a logfile from hadoop (1.2GB - 5
> RDD's created each approx 250MB data) and there are two jobs. Job A gets
> the line with "a" and the Job B gets the line with "b". The spark
> application is ran multiple times, each time with
> different executor memory, and enable/disable cache() function. Job A
> performance is same in all the runs as it has to read the entire data first
> time from Disk.
>
> Spark Cluster - standalone mode with Spark Master, single worker node (12
> cores, 16GB memory)
>
> val logData = sc.textFile(logFile, 2)
> var numAs = logData.filter(line => line.contains("a")).count()
> var numBs = logData.filter(line => line.contains("b")).count()
>
>
> *Job B (which has 5 tasks) results below:*
>
> *Run 1:* 1 executor with 2GB memory, 12 cores took 2 seconds [ran1 image]
>
> Since logData is not cached, the job B has to again read the 1.2GB
> data from hadoop into memory and all the 5 tasks started parallel and each
> took 2 sec (29ms for GC) and the
>  overall job completed in 2 seconds.
>
> *Run 2:* 1 executor with 2GB memory, 12 cores and logData is cached took
> 4 seconds [ran2 image, ran2_cache image]
>
>  val logData = sc.textFile(logFile, 2).cache()
>
>  The Executor does not have enough memory to cache and hence again
> needs to read the entire 1.2GB data from hadoop into memory.  But since the
> cache() is used, leads to lot of GC pause leading to slowness in task
> completion. Each task started parallel and
> completed in 4 seconds (more than 1 sec for GC).
>
> *Run 3: 1 executor with 6GB memory, 12 cores and logData is cached took 10
> seconds [ran3 image]*
>
>  The Executor has memory that can fit 4 RDD partitions into memory but
> 5th RDD it has to read from Hadoop. 4 tasks are started parallel and they
> completed in 0.3 seconds without GC. But the 5th task which has to read RDD
> from disk is started after 4 seconds, and gets completed in 2 seconds.
> Analysing why the 5th task is not started parallel with other tasks or at
> least why it is not started immediately after the other task completion.
>
> *Run 4:* 1 executor with 16GB memory , 12 cores and logData is cached
> took 0.3 seconds [ran4 image]
>
>  The executor has enough memory to cache all the 5 RDD. All 5 tasks
> are started in parallel and gets completed within 0.3 seconds.
>
>
> So Spark performs well when entire input data is in Memory or None. In
> case of some RDD in memory and some from disk, there is a delay in
> scheduling the fifth task, is it a expected behavior or a possible Bug.
>
>
>
> Thanks,
> Prabhu Joseph
>
>
>
>


Re: Spark job does not perform well when some RDD in memory and some on Disk

2016-02-04 Thread Prabhu Joseph
If spark.locality.wait is 0, then there are two performance issues:

   1. Task Scheduler won't wait to schedule the tasks as DATA_LOCAL, will
launch it immediately on some node even if it is less local. The
probability of tasks running as less local will be higher
and affect the overall Job Performance.
  2. In case of Executor having not enough heap memory, some tasks
which has RDD on cache and some other has on hadoop, and if
spark.locality.wait is 0, all the tasks will starts parallel and since the
Executor Process is both Memory and IO intensive, the GC will be high and
tasks will be slower.














On Thu, Feb 4, 2016 at 5:13 PM, Alonso Isidoro Roman <alons...@gmail.com>
wrote:

> "But learned that it is better not to reduce it to 0."
>
> could you explain a bit more this sentence?
>
> thanks
>
> Alonso Isidoro Roman.
>
> Mis citas preferidas (de hoy) :
> "Si depurar es el proceso de quitar los errores de software, entonces
> programar debe ser el proceso de introducirlos..."
>  -  Edsger Dijkstra
>
> My favorite quotes (today):
> "If debugging is the process of removing software bugs, then programming
> must be the process of putting ..."
>   - Edsger Dijkstra
>
> "If you pay peanuts you get monkeys"
>
>
> 2016-02-04 11:33 GMT+01:00 Prabhu Joseph <prabhujose.ga...@gmail.com>:
>
>> Okay, the reason for the task delay within executor when some RDD in
>> memory and some in Hadoop i.e, Multiple Locality Levels NODE_LOCAL and ANY,
>> in this case Scheduler waits
>> for *spark.locality.wait *3 seconds default. During this period,
>> scheduler waits to launch a data-local task before giving up and launching
>> it on a less-local node.
>>
>> So after making it 0, all tasks started parallel. But learned that it is
>> better not to reduce it to 0.
>>
>>
>> On Mon, Feb 1, 2016 at 2:02 PM, Prabhu Joseph <prabhujose.ga...@gmail.com
>> > wrote:
>>
>>> Hi All,
>>>
>>>
>>> Sample Spark application which reads a logfile from hadoop (1.2GB - 5
>>> RDD's created each approx 250MB data) and there are two jobs. Job A gets
>>> the line with "a" and the Job B gets the line with "b". The spark
>>> application is ran multiple times, each time with
>>> different executor memory, and enable/disable cache() function. Job A
>>> performance is same in all the runs as it has to read the entire data first
>>> time from Disk.
>>>
>>> Spark Cluster - standalone mode with Spark Master, single worker node
>>> (12 cores, 16GB memory)
>>>
>>> val logData = sc.textFile(logFile, 2)
>>> var numAs = logData.filter(line => line.contains("a")).count()
>>> var numBs = logData.filter(line => line.contains("b")).count()
>>>
>>>
>>> *Job B (which has 5 tasks) results below:*
>>>
>>> *Run 1:* 1 executor with 2GB memory, 12 cores took 2 seconds [ran1
>>> image]
>>>
>>> Since logData is not cached, the job B has to again read the 1.2GB
>>> data from hadoop into memory and all the 5 tasks started parallel and each
>>> took 2 sec (29ms for GC) and the
>>>  overall job completed in 2 seconds.
>>>
>>> *Run 2:* 1 executor with 2GB memory, 12 cores and logData is cached
>>> took 4 seconds [ran2 image, ran2_cache image]
>>>
>>>  val logData = sc.textFile(logFile, 2).cache()
>>>
>>>  The Executor does not have enough memory to cache and hence again
>>> needs to read the entire 1.2GB data from hadoop into memory.  But since the
>>> cache() is used, leads to lot of GC pause leading to slowness in task
>>> completion. Each task started parallel and
>>> completed in 4 seconds (more than 1 sec for GC).
>>>
>>> *Run 3: 1 executor with 6GB memory, 12 cores and logData is cached took
>>> 10 seconds [ran3 image]*
>>>
>>>  The Executor has memory that can fit 4 RDD partitions into memory
>>> but 5th RDD it has to read from Hadoop. 4 tasks are started parallel and
>>> they completed in 0.3 seconds without GC. But the 5th task which has to
>>> read RDD from disk is started after 4 seconds, and gets completed in 2
>>> seconds. Analysing why the 5th task is not started parallel with other
>>> tasks or at least why it is not started immediately after the other task
>>> completion.
>>>
>>> *Run 4:* 1 executor with 16GB memory , 12 cores and logData is cached
>>> took 0.3 seconds [ran4 image]
>>>
>>>  The executor has enough memory to cache all the 5 RDD. All 5 tasks
>>> are started in parallel and gets completed within 0.3 seconds.
>>>
>>>
>>> So Spark performs well when entire input data is in Memory or None. In
>>> case of some RDD in memory and some from disk, there is a delay in
>>> scheduling the fifth task, is it a expected behavior or a possible Bug.
>>>
>>>
>>>
>>> Thanks,
>>> Prabhu Joseph
>>>
>>>
>>>
>>>
>>
>


Re: About cache table performance in spark sql

2016-02-03 Thread Prabhu Joseph
Sun,

   When Executor don't have enough memory and if it tries to cache the
data, it spends lot of time on GC and hence the job will be slow. Either,

 1. We should allocate enough memory to cache all RDD and hence the job
will complete fast
Or 2. Don't use cache when there is not enough Executor memory.

  To check the GC time, use  --conf
"spark.executor.extraJavaOptions=-XX:+PrintGCDetails
-XX:+PrintGCTimeStamps" while submitting the job and SPARK_WORKER_DIR will
have sysout with GC.
The sysout will show many "Full GC" happening when cache is used and
executor does not have enough heap.


Thanks,
Prabhu Joseph

On Thu, Feb 4, 2016 at 11:25 AM, fightf...@163.com <fightf...@163.com>
wrote:

> Hi,
>
> I want to make sure that the cache table indeed would accelerate sql
> queries. Here is one of my use case :
>   impala table size : 24.59GB,  no partitions, with about 1 billion+ rows.
> I use sqlContext.sql to run queries over this table and try to do cache
> and uncache command to see if there
> is any performance disparity. I ran the following query :
> select * from video1203 where id > 10 and id < 20 and added_year != 1989
> I can see the following results :
>
> 1  If I did not run cache table and just ran sqlContext.sql(), I can see
> the above query run about 25 seconds.
> 2  If I firstly run sqlContext.cacheTable("video1203"), the query runs
> super slow and would cause driver OOM exception, but I can
> get final results with about running 9 minuts.
>
> Would any expert can explain this for me ? I can see that cacheTable cause
> OOM just because the in-memory columnar storage
> cannot hold the 24.59GB+ table size into memory. But why the performance
> is so different and even so bad ?
>
> Best,
> Sun.
>
> --
> fightf...@163.com
>


Spark saveAsHadoopFile stage fails with ExecutorLostfailure

2016-02-02 Thread Prabhu Joseph
Hi All,

   Spark job stage having saveAsHadoopFile fails with ExecutorLostFailure
whenever the Executor is run with more cores. The stage is not memory
intensive, executor has 20GB memory. for example,

6 executors each with 6 cores, ExecutorLostFailure happens

10 executors each with 2 cores, saveAsHadoopFile runs fine.

What could be the reason for ExecutorLostFailure failing when cores per
executor is high.



Error: ExecutorLostFailure (executor 3 lost)

16/02/02 04:22:40 WARN TaskSetManager: Lost task 1.3 in stage 15.0 (TID
1318, hdnprd-c01-r01-14):



Thanks,
Prabhu Joseph


Spark Executor retries infinitely

2016-02-01 Thread Prabhu Joseph
Hi All,

  When a Spark job (Spark-1.5.2) is submitted with a single executor and if
user passes some wrong JVM arguments with spark.executor.extraJavaOptions,
the first executor fails. But the job keeps on retrying, creating a new
executor and failing every tim*e, *until CTRL-C is pressed*. *Do we have
configuration to limit the retry attempts.

*Example:*

./spark-submit --class SimpleApp --master "spark://10.10.72.145:7077"
--conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails
-XX:+PrintGCTimeStamps -XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=35
-XX:ConcGCThreads=16" /SPARK/SimpleApp.jar

Executor fails with

Error occurred during initialization of VM
Can't have more ConcGCThreads than ParallelGCThreads.

But the job does not exit, keeps on creating executors and retrying.
..
16/02/01 06:54:28 INFO SparkDeploySchedulerBackend: *Granted executor ID
app-20160201065319-0014/2846* on hostPort 10.10.72.145:36558 with 12 cores,
2.0 GB RAM
16/02/01 06:54:28 INFO AppClient$ClientEndpoint: Executor updated:
app-20160201065319-0014/2846 is now LOADING
16/02/01 06:54:28 INFO AppClient$ClientEndpoint: Executor updated:
app-20160201065319-0014/2846 is now RUNNING
16/02/01 06:54:28 INFO AppClient$ClientEndpoint: Executor updated:
app-20160201065319-0014/2846 is now EXITED (Command exited with code 1)
16/02/01 06:54:28 INFO SparkDeploySchedulerBackend: Executor
app-20160201065319-0014/2846 removed: Command exited with code 1
16/02/01 06:54:28 INFO SparkDeploySchedulerBackend: Asked to remove
non-existent executor 2846
16/02/01 06:54:28 INFO AppClient$ClientEndpoint: *Executor added:
app-20160201065319-0014/2847* on worker-20160131230345-10.10.72.145-36558 (
10.10.72.145:36558) with 12 cores
16/02/01 06:54:28 INFO SparkDeploySchedulerBackend: Granted executor ID
app-20160201065319-0014/2847 on hostPort 10.10.72.145:36558 with 12 cores,
2.0 GB RAM
16/02/01 06:54:28 INFO AppClient$ClientEndpoint: Executor updated:
app-20160201065319-0014/2847 is now LOADING
16/02/01 06:54:28 INFO AppClient$ClientEndpoint: Executor updated:
app-20160201065319-0014/2847 is now EXITED (Command exited with code 1)
16/02/01 06:54:28 INFO SparkDeploySchedulerBackend: Executor
app-20160201065319-0014/2847 removed: Command exited with code 1
16/02/01 06:54:28 INFO SparkDeploySchedulerBackend: Asked to remove
non-existent executor 2847
16/02/01 06:54:28 INFO AppClient$ClientEndpoint:* Executor added:
app-20160201065319-0014/2848* on worker-20160131230345-10.10.72.145-36558 (
10.10.72.145:36558) with 12 cores
16/02/01 06:54:28 INFO SparkDeploySchedulerBackend: Granted executor ID
app-20160201065319-0014/2848 on hostPort 10.10.72.145:36558 with 12 cores,
2.0 GB RAM
16/02/01 06:54:28 INFO AppClient$ClientEndpoint: Executor updated:
app-20160201065319-0014/2848 is now LOADING
16/02/01 06:54:28 INFO AppClient$ClientEndpoint: Executor updated:
app-20160201065319-0014/2848 is now RUNNING
....



Thanks,
Prabhu Joseph


Re: Spark Executor retries infinitely

2016-02-01 Thread Prabhu Joseph
Thanks Ted. My concern is how to avoid these kind of user errors on a
production cluster, it would be better if Spark handles this instead of
creating an Executor for every second and fails and overloading the Spark
Master. Shall i report a Spark JIRA to handle this.


Thanks,
Prabhu Joseph


On Mon, Feb 1, 2016 at 9:09 PM, Ted Yu <yuzhih...@gmail.com> wrote:

> I haven't found config knob for controlling the retry count after brief
> search.
>
> According to
> http://www.oracle.com/technetwork/articles/java/g1gc-1984535.html ,
> default value for -XX:ParallelGCThreads= seems to be 8.
> This seems to explain why you got the VM initialization error.
>
> FYI
>
> On Mon, Feb 1, 2016 at 4:16 AM, Prabhu Joseph <prabhujose.ga...@gmail.com>
> wrote:
>
>> Hi All,
>>
>>   When a Spark job (Spark-1.5.2) is submitted with a single executor and
>> if user passes some wrong JVM arguments with
>> spark.executor.extraJavaOptions, the first executor fails. But the job
>> keeps on retrying, creating a new executor and failing every tim*e, *until
>> CTRL-C is pressed*. *Do we have configuration to limit the retry
>> attempts.
>>
>> *Example:*
>>
>> ./spark-submit --class SimpleApp --master "spark://10.10.72.145:7077"
>> --conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails
>> -XX:+PrintGCTimeStamps -XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=35
>> -XX:ConcGCThreads=16" /SPARK/SimpleApp.jar
>>
>> Executor fails with
>>
>> Error occurred during initialization of VM
>> Can't have more ConcGCThreads than ParallelGCThreads.
>>
>> But the job does not exit, keeps on creating executors and retrying.
>> ..
>> 16/02/01 06:54:28 INFO SparkDeploySchedulerBackend: *Granted executor ID
>> app-20160201065319-0014/2846* on hostPort 10.10.72.145:36558 with 12
>> cores, 2.0 GB RAM
>> 16/02/01 06:54:28 INFO AppClient$ClientEndpoint: Executor updated:
>> app-20160201065319-0014/2846 is now LOADING
>> 16/02/01 06:54:28 INFO AppClient$ClientEndpoint: Executor updated:
>> app-20160201065319-0014/2846 is now RUNNING
>> 16/02/01 06:54:28 INFO AppClient$ClientEndpoint: Executor updated:
>> app-20160201065319-0014/2846 is now EXITED (Command exited with code 1)
>> 16/02/01 06:54:28 INFO SparkDeploySchedulerBackend: Executor
>> app-20160201065319-0014/2846 removed: Command exited with code 1
>> 16/02/01 06:54:28 INFO SparkDeploySchedulerBackend: Asked to remove
>> non-existent executor 2846
>> 16/02/01 06:54:28 INFO AppClient$ClientEndpoint: *Executor added:
>> app-20160201065319-0014/2847* on
>> worker-20160131230345-10.10.72.145-36558 (10.10.72.145:36558) with 12
>> cores
>> 16/02/01 06:54:28 INFO SparkDeploySchedulerBackend: Granted executor ID
>> app-20160201065319-0014/2847 on hostPort 10.10.72.145:36558 with 12
>> cores, 2.0 GB RAM
>> 16/02/01 06:54:28 INFO AppClient$ClientEndpoint: Executor updated:
>> app-20160201065319-0014/2847 is now LOADING
>> 16/02/01 06:54:28 INFO AppClient$ClientEndpoint: Executor updated:
>> app-20160201065319-0014/2847 is now EXITED (Command exited with code 1)
>> 16/02/01 06:54:28 INFO SparkDeploySchedulerBackend: Executor
>> app-20160201065319-0014/2847 removed: Command exited with code 1
>> 16/02/01 06:54:28 INFO SparkDeploySchedulerBackend: Asked to remove
>> non-existent executor 2847
>> 16/02/01 06:54:28 INFO AppClient$ClientEndpoint:* Executor added:
>> app-20160201065319-0014/2848* on
>> worker-20160131230345-10.10.72.145-36558 (10.10.72.145:36558) with 12
>> cores
>> 16/02/01 06:54:28 INFO SparkDeploySchedulerBackend: Granted executor ID
>> app-20160201065319-0014/2848 on hostPort 10.10.72.145:36558 with 12
>> cores, 2.0 GB RAM
>> 16/02/01 06:54:28 INFO AppClient$ClientEndpoint: Executor updated:
>> app-20160201065319-0014/2848 is now LOADING
>> 16/02/01 06:54:28 INFO AppClient$ClientEndpoint: Executor updated:
>> app-20160201065319-0014/2848 is now RUNNING
>> 
>>
>>
>>
>> Thanks,
>> Prabhu Joseph
>>
>>
>>
>


Spark on YARN job continuously reports "Application does not exist in cache"

2016-01-13 Thread Prabhu Joseph
Hi All,

  When we submit Spark jobs on YARN, during RM failover, we see lot of jobs
reporting below error messages.


*2016-01-11 09:41:06,682 INFO
org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService:
Unregistering app attempt : appattempt_1450676950893_0280_01*
2016-01-11 09:41:06,683 INFO
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl:
appattempt_1450676950893_0280_01 State change from FINAL_SAVING to
FAILED
2016-01-11 09:41:06,683 INFO
org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl:
application_1450676950893_0280 State change from RUNNING to ACCEPTED
2016-01-11 09:41:06,683 INFO
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler:
Application appattempt_1450676950893_0280_01 is done. finalState=FAILED
2016-01-11 09:41:06,683 INFO
org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService:
Registering app attempt : appattempt_1450676950893_0280_02
2016-01-11 09:41:06,683 INFO
org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo:
Application application_1450676950893_0280 requests cleared
2016-01-11 09:41:06,683 INFO
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl:
appattempt_1450676950893_0280_02 State change from NEW to SUBMITTED
2016-01-11 09:41:06,683 INFO
org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncher:
Cleaning master appattempt_1450676950893_0280_01
2016-01-11 09:41:06,683 INFO
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler:
Added Application Attempt appattempt_1450676950893_0280_02 to scheduler
from user: glenm
2016-01-11 09:41:06,683 INFO
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl:
appattempt_1450676950893_0280_02 State change from SUBMITTED to
SCHEDULED




*2016-01-11 09:41:06,747 ERROR
org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService:
AppAttemptId doesnt exist in cache appattempt_1450676950893_0280_01*
ResourceManager has a ConcurrentMap where it puts applicationId during
resgistering of application attempt, and when there is
finishApplicationMaster request, it gets the entry from ConcurrentMap, if
there if no entry present, it throws that ERROR message. When there is
unregistering Application Attempt, it removes the entry.

So, after the unregistering application attempt, there are many
finishApplicationMaster request causing the ERROR.

Need your help to understand on what scenario the above happens.


JIRA's related are

https://issues.apache.org/jira/browse/SPARK-1032
https://issues.apache.org/jira/browse/SPARK-3072



Thanks,
Prabhu Joseph


Re: How to find cause(waiting threads etc) of hanging job for 7 hours?

2016-01-11 Thread Prabhu Joseph
Umesh,

  Running task is a thread within the executor process. We need to take
stack trace for the executor process. The executor will be running in any
NodeManager machine as a container.

  YARN RM UI running jobs will have the host details where executor is
running. Login to that NodeManager machine and jps -l will list all java
processes, jstack -l  will give the stack trace.


Thanks,
Prabhu Joseph

On Mon, Jan 11, 2016 at 7:56 PM, Umesh Kacha <umesh.ka...@gmail.com> wrote:

> Hi Prabhu thanks for the response. How do I find pid of a slow running
> task. Task is running in yarn cluster node. When I try to see pid of a
> running task using my user I see some 7-8 digit number instead of user
> running process any idea why spark creates this number instead of
> displaying user
> On Jan 3, 2016 6:06 AM, "Prabhu Joseph" <prabhujose.ga...@gmail.com>
> wrote:
>
>> The attached image just has thread states, and WAITING threads need not
>> be the issue. We need to take thread stack traces and identify at which
>> area of code, threads are spending lot of time.
>>
>> Use jstack -l  or kill -3 , where pid is the process id of the
>> executor process. Take jstack stack trace for every 2 seconds and total 1
>> minute. This will help to identify the code where threads are spending lot
>> of time and then try to tune.
>>
>> Thanks,
>> Prabhu Joseph
>>
>>
>>
>> On Sat, Jan 2, 2016 at 1:28 PM, Umesh Kacha <umesh.ka...@gmail.com>
>> wrote:
>>
>>> Hi thanks I did that and I have attached thread dump images. That was
>>> the intention of my question asking for help to identify which waiting
>>> thread is culprit.
>>>
>>> Regards,
>>> Umesh
>>>
>>> On Sat, Jan 2, 2016 at 8:38 AM, Prabhu Joseph <
>>> prabhujose.ga...@gmail.com> wrote:
>>>
>>>> Take thread dump of Executor process several times in a short time
>>>> period and check what each threads are doing at different times which will
>>>> help to identify the expensive sections in user code.
>>>>
>>>> Thanks,
>>>> Prabhu Joseph
>>>>
>>>> On Sat, Jan 2, 2016 at 3:28 AM, unk1102 <umesh.ka...@gmail.com> wrote:
>>>>
>>>>> Sorry please see attached waiting thread log
>>>>>
>>>>> <
>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n25851/Screen_Shot_2016-01-02_at_2.jpg
>>>>> >
>>>>> <
>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n25851/Screen_Shot_2016-01-02_at_2.jpg
>>>>> >
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> View this message in context:
>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-find-cause-waiting-threads-etc-of-hanging-job-for-7-hours-tp25850p25851.html
>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>> Nabble.com.
>>>>>
>>>>> -
>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>
>>>>>
>>>>
>>>
>>


Re: How to find cause(waiting threads etc) of hanging job for 7 hours?

2016-01-02 Thread Prabhu Joseph
The attached image just has thread states, and WAITING threads need not be
the issue. We need to take thread stack traces and identify at which area
of code, threads are spending lot of time.

Use jstack -l  or kill -3 , where pid is the process id of the
executor process. Take jstack stack trace for every 2 seconds and total 1
minute. This will help to identify the code where threads are spending lot
of time and then try to tune.

Thanks,
Prabhu Joseph



On Sat, Jan 2, 2016 at 1:28 PM, Umesh Kacha <umesh.ka...@gmail.com> wrote:

> Hi thanks I did that and I have attached thread dump images. That was the
> intention of my question asking for help to identify which waiting thread
> is culprit.
>
> Regards,
> Umesh
>
> On Sat, Jan 2, 2016 at 8:38 AM, Prabhu Joseph <prabhujose.ga...@gmail.com>
> wrote:
>
>> Take thread dump of Executor process several times in a short time period
>> and check what each threads are doing at different times which will help to
>> identify the expensive sections in user code.
>>
>> Thanks,
>> Prabhu Joseph
>>
>> On Sat, Jan 2, 2016 at 3:28 AM, unk1102 <umesh.ka...@gmail.com> wrote:
>>
>>> Sorry please see attached waiting thread log
>>>
>>> <
>>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n25851/Screen_Shot_2016-01-02_at_2.jpg
>>> >
>>> <
>>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n25851/Screen_Shot_2016-01-02_at_2.jpg
>>> >
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-find-cause-waiting-threads-etc-of-hanging-job-for-7-hours-tp25850p25851.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


Re: How to find cause(waiting threads etc) of hanging job for 7 hours?

2016-01-01 Thread Prabhu Joseph
Take thread dump of Executor process several times in a short time period
and check what each threads are doing at different times which will help to
identify the expensive sections in user code.

Thanks,
Prabhu Joseph

On Sat, Jan 2, 2016 at 3:28 AM, unk1102 <umesh.ka...@gmail.com> wrote:

> Sorry please see attached waiting thread log
>
> <
> http://apache-spark-user-list.1001560.n3.nabble.com/file/n25851/Screen_Shot_2016-01-02_at_2.jpg
> >
> <
> http://apache-spark-user-list.1001560.n3.nabble.com/file/n25851/Screen_Shot_2016-01-02_at_2.jpg
> >
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-find-cause-waiting-threads-etc-of-hanging-job-for-7-hours-tp25850p25851.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>