Re: Mesos Spark Fine Grained Execution - CPU count

2016-12-19 Thread Mehdi Meziane
We will be interested by the results if you give a try to Dynamic allocation 
with mesos ! 



- Mail Original - 
De: "Michael Gummelt"  
À: "Sumit Chawla"  
Cc: u...@mesos.apache.org, d...@mesos.apache.org, "User" 
, d...@spark.apache.org 
Envoyé: Lundi 19 Décembre 2016 22h42:55 GMT +01:00 Amsterdam / Berlin / Berne / 
Rome / Stockholm / Vienne 
Objet: Re: Mesos Spark Fine Grained Execution - CPU count 



> Is this problem of idle executors sticking around solved in Dynamic Resource 
> Allocation? Is there some timeout after which Idle executors can just 
> shutdown and cleanup its resources. 

Yes, that's exactly what dynamic allocation does. But again I have no idea what 
the state of dynamic allocation + mesos is. 



On Mon, Dec 19, 2016 at 1:32 PM, Chawla,Sumit < sumitkcha...@gmail.com > wrote: 



Great. Makes much better sense now. What will be reason to have 
spark.mesos.mesosExecutor. cores more than 1, as this number doesn't include 
the number of cores for tasks. 


So in my case it seems like 30 CPUs are allocated to executors. And there are 
48 tasks so 48 + 30 = 78 CPUs. And i am noticing this gap of 30 is maintained 
till the last task exits. This explains the gap. Thanks everyone. I am still 
not sure how this number 30 is calculated. ( Is it dynamic based on current 
resources, or is it some configuration. I have 32 nodes in my cluster). 


Is this problem of idle executors sticking around solved in Dynamic Resource 
Allocation? Is there some timeout after which Idle executors can just shutdown 
and cleanup its resources. 





Regards 
Sumit Chawla 





On Mon, Dec 19, 2016 at 12:45 PM, Michael Gummelt < mgumm...@mesosphere.io > 
wrote: 





> I should preassume that No of executors should be less than number of tasks. 

No. Each executor runs 0 or more tasks. 

Each executor consumes 1 CPU, and each task running on that executor consumes 
another CPU. You can customize this via spark.mesos.mesosExecutor.cores ( 
https://github.com/apache/spark/blob/v1.6.3/docs/running-on-mesos.md ) and 
spark.task.cpus ( 
https://github.com/apache/spark/blob/v1.6.3/docs/configuration.md ) 





On Mon, Dec 19, 2016 at 12:09 PM, Chawla,Sumit < sumitkcha...@gmail.com > 
wrote: 



Ah thanks. looks like i skipped reading this " Neither will executors terminate 
when they’re idle." 


So in my job scenario, I should preassume that No of executors should be less 
than number of tasks. Ideally one executor should execute 1 or more tasks. But 
i am observing something strange instead. I start my job with 48 partitions for 
a spark job. In mesos ui i see that number of tasks is 48, but no. of CPUs is 
78 which is way more than 48. Here i am assuming that 1 CPU is 1 executor. I am 
not specifying any configuration to set number of cores per executor. 



Regards 
Sumit Chawla 





On Mon, Dec 19, 2016 at 11:35 AM, Joris Van Remoortere < jo...@mesosphere.io > 
wrote: 



That makes sense. From the documentation it looks like the executors are not 
supposed to terminate: 
http://spark.apache.org/docs/latest/running-on-mesos.html#fine-grained-deprecated
 


Note that while Spark tasks in fine-grained will relinquish cores as they 
terminate, they will not relinquish memory, as the JVM does not give memory 
back to the Operating System. Neither will executors terminate when they’re 
idle. 


I suppose your task to executor CPU ratio is low enough that it looks like most 
of the resources are not being reclaimed. If your tasks were using 
significantly more CPU the amortized cost of the idle executors would not be 
such a big deal. 






— 
Joris Van Remoortere 
Mesosphere 

On Mon, Dec 19, 2016 at 11:26 AM, Timothy Chen < tnac...@gmail.com > wrote: 


Hi Chawla, 

One possible reason is that Mesos fine grain mode also takes up cores 
to run the executor per host, so if you have 20 agents running Fine 
grained executor it will take up 20 cores while it's still running. 

Tim 

On Fri, Dec 16, 2016 at 8:41 AM, Chawla,Sumit < sumitkcha...@gmail.com > wrote: 


> Hi 
> 
> I am using Spark 1.6. I have one query about Fine Grained model in Spark. 
> I have a simple Spark application which transforms A -> B. Its a single 
> stage application. To begin the program, It starts with 48 partitions. 
> When the program starts running, in mesos UI it shows 48 tasks and 48 CPUs 
> allocated to job. Now as the tasks get done, the number of active tasks 
> number starts decreasing. How ever, the number of CPUs does not decrease 
> propotionally. When the job was about to finish, there was a single 
> remaininig task, however CPU count was still 20. 
> 
> My questions, is why there is no one to one mapping between tasks and cpus 
> in Fine grained? How can these CPUs be released when the job is done, so 
> that other jobs can start. 
> 
> 
> Regards 
> Sumit Chawla 





-- 







Michael Gummelt 
Software Engineer 
Mesosphere 




-- 








Re: Mesos Spark Fine Grained Execution - CPU count

2016-12-19 Thread Mehdi Meziane
I think that what you are looking for is Dynamic resource allocation: 
http://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation
 


Spark provides a mechanism to dynamically adjust the resources your application 
occupies based on the workload. This means that your application may give 
resources back to the cluster if they are no longer used and request them again 
later when there is demand. This feature is particularly useful if multiple 
applications share resources in your Spark cluster. 

- Mail Original - 
De: "Sumit Chawla"  
À: "Michael Gummelt"  
Cc: u...@mesos.apache.org, "Dev" , "User" 
, "dev"  
Envoyé: Lundi 19 Décembre 2016 19h35:51 GMT +01:00 Amsterdam / Berlin / Berne / 
Rome / Stockholm / Vienne 
Objet: Re: Mesos Spark Fine Grained Execution - CPU count 


But coarse grained does the exact same thing which i am trying to avert here. 
At the cost of lower startup, it keeps the resources reserved till the entire 
duration of the job. 



Regards 
Sumit Chawla 



On Mon, Dec 19, 2016 at 10:06 AM, Michael Gummelt < mgumm...@mesosphere.io > 
wrote: 




Hi 

I don't have a lot of experience with the fine-grained scheduler. It's 
deprecated and fairly old now. CPUs should be relinquished as tasks complete, 
so I'm not sure why you're seeing what you're seeing. There have been a few 
discussions on the spark list regarding deprecating the fine-grained scheduler, 
and no one seemed too dead-set on keeping it. I'd recommend you move over to 
coarse-grained. 





On Fri, Dec 16, 2016 at 8:41 AM, Chawla,Sumit < sumitkcha...@gmail.com > wrote: 



Hi 


I am using Spark 1.6. I have one query about Fine Grained model in Spark. I 
have a simple Spark application which transforms A -> B. Its a single stage 
application. To begin the program, It starts with 48 partitions. When the 
program starts running, in mesos UI it shows 48 tasks and 48 CPUs allocated to 
job. Now as the tasks get done, the number of active tasks number starts 
decreasing. How ever, the number of CPUs does not decrease propotionally. When 
the job was about to finish, there was a single remaininig task, however CPU 
count was still 20. 


My questions, is why there is no one to one mapping between tasks and cpus in 
Fine grained? How can these CPUs be released when the job is done, so that 
other jobs can start. 






Regards 
Sumit Chawla 




-- 







Michael Gummelt 
Software Engineer 
Mesosphere 



Re: [SQL] Reading from hive table is listing all files in S3

2016-08-03 Thread Mehdi Meziane
Hi Mich, 


The data is stored as parquet. 
The table definition looks like : 



CREATE EXTERNAL TABLE nadata ( 
extract_date TIMESTAMP, 
date_formatted STRING, 
day_of_week INT, 
hour_of_day INT, 
entity_label STRING, 
entity_currency_id INT, 
entity_currency_label STRING, 
entity_margin_percentage FLOAT, 
entity2_id INT, 
entity2_label STRING, 
entity2_categories ARRAY, 
entity3_id INT, 
entity3_label STRING, 
entity3_categories ARRAY, 
entity4_id INT, 
entity4_hid INT, 
entity4_label STRING, 
entity4_total_budget DOUBLE 
) 

PARTITIONED BY (day STRING,mba_id BIGINT,partition_id INT) 
STORED AS PARQUET 
LOCATION 's3a://bucketname/' 


Do you think the definition can be the source of the problem ? 
Thanks 

- Mail Original - 
De: "Mich Talebzadeh" <mich.talebza...@gmail.com> 
À: "Mehdi Meziane" <mehdi.mezi...@ldmobile.net> 
Cc: "user @spark" <user@spark.apache.org> 
Envoyé: Mercredi 3 Août 2016 16h47:46 GMT +01:00 Amsterdam / Berlin / Berne / 
Rome / Stockholm / Vienne 
Objet: Re: [SQL] Reading from hive table is listing all files in S3 



Hi, 


Do you have a schema definition for this Hive table? 


What format is this table stored 


HTH 
















Dr Mich Talebzadeh 



LinkedIn 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 



http://talebzadehmich.wordpress.com 




Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction. 



On 3 August 2016 at 15:03, Mehdi Meziane < mehdi.mezi...@ldmobile.net > wrote: 





Hi all, 


We have a hive table stored in S3 and registered in a hive metastore. 
This table is partitionned with a key "day". 


So we access this table through the spark dataframe API as : 


sqlContext.read() 
.table("tablename) 
.where(col("day").between("2016-08-01","2016-08-02")) 


When the job is launched, we can see that spark have tasks "table" which have a 
small duration (seconds) but takes minutes. 
In the logs we see that every paths for every partitions are listed, regardless 
the partition key values, during minutes. 


16/08/03 13:17:16 INFO HadoopFsRelation: Listing s3a://buckets3/day=2016-07-24 
16/08/03 13:17:16 INFO HadoopFsRelation: Listing s3a://buckets3/day=2016-07-25 
 


Is it a normal behaviour? Do we could specify something in the read().table, 
maybe some options? 
I tried to find such options but i cannot find anything. 


Thanks, 
Mehdi 


[SQL] Reading from hive table is listing all files in S3

2016-08-03 Thread Mehdi Meziane

Hi all, 


We have a hive table stored in S3 and registered in a hive metastore. 
This table is partitionned with a key "day". 


So we access this table through the spark dataframe API as : 


sqlContext.read() 
.table("tablename) 
.where(col("day").between("2016-08-01","2016-08-02")) 


When the job is launched, we can see that spark have tasks "table" which have a 
small duration (seconds) but takes minutes. 
In the logs we see that every paths for every partitions are listed, regardless 
the partition key values, during minutes. 


16/08/03 13:17:16 INFO HadoopFsRelation: Listing s3a://buckets3/day=2016-07-24 
16/08/03 13:17:16 INFO HadoopFsRelation: Listing s3a://buckets3/day=2016-07-25 
 


Is it a normal behaviour? Do we could specify something in the read().table, 
maybe some options? 
I tried to find such options but i cannot find anything. 


Thanks, 
Mehdi 

Spark ML - Java implementation of custom Transformer

2016-06-27 Thread Mehdi Meziane



Hi all, 


We have some problems while implementing custom Transformers in JAVA (SPARK 
1.6.1). 
We do override the method copy, but it crashes with an AbstractMethodError. 


If we extends the UnaryTransformer, and do not override the copy method, it 
works without any error. 


We tried to write the copy like in these examples : 
https://github.com/apache/spark/blob/branch-2.0/mllib/src/test/java/org/apache/spark/ml/param/JavaTestParams.java
 
https://github.com/eBay/Spark/blob/branch-1.6/examples/src/main/java/org/apache/spark/examples/ml/JavaDeveloperApiExample.java
 


None of it worked. 


The copy is defined in the Params class as : 


/** 
* Creates a copy of this instance with the same UID and some extra params. 
* Subclasses should implement this method and set the return type properly. 
* 
* @see [[defaultCopy()]] 
*/ 
def copy(extra: ParamMap): Params 


Any idea? 
Thanks, 


Mehdi