Re: Mesos Spark Fine Grained Execution - CPU count
We will be interested by the results if you give a try to Dynamic allocation with mesos ! - Mail Original - De: "Michael Gummelt"À: "Sumit Chawla" Cc: u...@mesos.apache.org, d...@mesos.apache.org, "User" , d...@spark.apache.org Envoyé: Lundi 19 Décembre 2016 22h42:55 GMT +01:00 Amsterdam / Berlin / Berne / Rome / Stockholm / Vienne Objet: Re: Mesos Spark Fine Grained Execution - CPU count > Is this problem of idle executors sticking around solved in Dynamic Resource > Allocation? Is there some timeout after which Idle executors can just > shutdown and cleanup its resources. Yes, that's exactly what dynamic allocation does. But again I have no idea what the state of dynamic allocation + mesos is. On Mon, Dec 19, 2016 at 1:32 PM, Chawla,Sumit < sumitkcha...@gmail.com > wrote: Great. Makes much better sense now. What will be reason to have spark.mesos.mesosExecutor. cores more than 1, as this number doesn't include the number of cores for tasks. So in my case it seems like 30 CPUs are allocated to executors. And there are 48 tasks so 48 + 30 = 78 CPUs. And i am noticing this gap of 30 is maintained till the last task exits. This explains the gap. Thanks everyone. I am still not sure how this number 30 is calculated. ( Is it dynamic based on current resources, or is it some configuration. I have 32 nodes in my cluster). Is this problem of idle executors sticking around solved in Dynamic Resource Allocation? Is there some timeout after which Idle executors can just shutdown and cleanup its resources. Regards Sumit Chawla On Mon, Dec 19, 2016 at 12:45 PM, Michael Gummelt < mgumm...@mesosphere.io > wrote: > I should preassume that No of executors should be less than number of tasks. No. Each executor runs 0 or more tasks. Each executor consumes 1 CPU, and each task running on that executor consumes another CPU. You can customize this via spark.mesos.mesosExecutor.cores ( https://github.com/apache/spark/blob/v1.6.3/docs/running-on-mesos.md ) and spark.task.cpus ( https://github.com/apache/spark/blob/v1.6.3/docs/configuration.md ) On Mon, Dec 19, 2016 at 12:09 PM, Chawla,Sumit < sumitkcha...@gmail.com > wrote: Ah thanks. looks like i skipped reading this " Neither will executors terminate when they’re idle." So in my job scenario, I should preassume that No of executors should be less than number of tasks. Ideally one executor should execute 1 or more tasks. But i am observing something strange instead. I start my job with 48 partitions for a spark job. In mesos ui i see that number of tasks is 48, but no. of CPUs is 78 which is way more than 48. Here i am assuming that 1 CPU is 1 executor. I am not specifying any configuration to set number of cores per executor. Regards Sumit Chawla On Mon, Dec 19, 2016 at 11:35 AM, Joris Van Remoortere < jo...@mesosphere.io > wrote: That makes sense. From the documentation it looks like the executors are not supposed to terminate: http://spark.apache.org/docs/latest/running-on-mesos.html#fine-grained-deprecated Note that while Spark tasks in fine-grained will relinquish cores as they terminate, they will not relinquish memory, as the JVM does not give memory back to the Operating System. Neither will executors terminate when they’re idle. I suppose your task to executor CPU ratio is low enough that it looks like most of the resources are not being reclaimed. If your tasks were using significantly more CPU the amortized cost of the idle executors would not be such a big deal. — Joris Van Remoortere Mesosphere On Mon, Dec 19, 2016 at 11:26 AM, Timothy Chen < tnac...@gmail.com > wrote: Hi Chawla, One possible reason is that Mesos fine grain mode also takes up cores to run the executor per host, so if you have 20 agents running Fine grained executor it will take up 20 cores while it's still running. Tim On Fri, Dec 16, 2016 at 8:41 AM, Chawla,Sumit < sumitkcha...@gmail.com > wrote: > Hi > > I am using Spark 1.6. I have one query about Fine Grained model in Spark. > I have a simple Spark application which transforms A -> B. Its a single > stage application. To begin the program, It starts with 48 partitions. > When the program starts running, in mesos UI it shows 48 tasks and 48 CPUs > allocated to job. Now as the tasks get done, the number of active tasks > number starts decreasing. How ever, the number of CPUs does not decrease > propotionally. When the job was about to finish, there was a single > remaininig task, however CPU count was still 20. > > My questions, is why there is no one to one mapping between tasks and cpus > in Fine grained? How can these CPUs be released when the job is done, so > that other jobs can start. > > > Regards > Sumit Chawla -- Michael Gummelt Software Engineer Mesosphere --
Re: Mesos Spark Fine Grained Execution - CPU count
I think that what you are looking for is Dynamic resource allocation: http://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation Spark provides a mechanism to dynamically adjust the resources your application occupies based on the workload. This means that your application may give resources back to the cluster if they are no longer used and request them again later when there is demand. This feature is particularly useful if multiple applications share resources in your Spark cluster. - Mail Original - De: "Sumit Chawla"À: "Michael Gummelt" Cc: u...@mesos.apache.org, "Dev" , "User" , "dev" Envoyé: Lundi 19 Décembre 2016 19h35:51 GMT +01:00 Amsterdam / Berlin / Berne / Rome / Stockholm / Vienne Objet: Re: Mesos Spark Fine Grained Execution - CPU count But coarse grained does the exact same thing which i am trying to avert here. At the cost of lower startup, it keeps the resources reserved till the entire duration of the job. Regards Sumit Chawla On Mon, Dec 19, 2016 at 10:06 AM, Michael Gummelt < mgumm...@mesosphere.io > wrote: Hi I don't have a lot of experience with the fine-grained scheduler. It's deprecated and fairly old now. CPUs should be relinquished as tasks complete, so I'm not sure why you're seeing what you're seeing. There have been a few discussions on the spark list regarding deprecating the fine-grained scheduler, and no one seemed too dead-set on keeping it. I'd recommend you move over to coarse-grained. On Fri, Dec 16, 2016 at 8:41 AM, Chawla,Sumit < sumitkcha...@gmail.com > wrote: Hi I am using Spark 1.6. I have one query about Fine Grained model in Spark. I have a simple Spark application which transforms A -> B. Its a single stage application. To begin the program, It starts with 48 partitions. When the program starts running, in mesos UI it shows 48 tasks and 48 CPUs allocated to job. Now as the tasks get done, the number of active tasks number starts decreasing. How ever, the number of CPUs does not decrease propotionally. When the job was about to finish, there was a single remaininig task, however CPU count was still 20. My questions, is why there is no one to one mapping between tasks and cpus in Fine grained? How can these CPUs be released when the job is done, so that other jobs can start. Regards Sumit Chawla -- Michael Gummelt Software Engineer Mesosphere
Re: [SQL] Reading from hive table is listing all files in S3
Hi Mich, The data is stored as parquet. The table definition looks like : CREATE EXTERNAL TABLE nadata ( extract_date TIMESTAMP, date_formatted STRING, day_of_week INT, hour_of_day INT, entity_label STRING, entity_currency_id INT, entity_currency_label STRING, entity_margin_percentage FLOAT, entity2_id INT, entity2_label STRING, entity2_categories ARRAY, entity3_id INT, entity3_label STRING, entity3_categories ARRAY, entity4_id INT, entity4_hid INT, entity4_label STRING, entity4_total_budget DOUBLE ) PARTITIONED BY (day STRING,mba_id BIGINT,partition_id INT) STORED AS PARQUET LOCATION 's3a://bucketname/' Do you think the definition can be the source of the problem ? Thanks - Mail Original - De: "Mich Talebzadeh" <mich.talebza...@gmail.com> À: "Mehdi Meziane" <mehdi.mezi...@ldmobile.net> Cc: "user @spark" <user@spark.apache.org> Envoyé: Mercredi 3 Août 2016 16h47:46 GMT +01:00 Amsterdam / Berlin / Berne / Rome / Stockholm / Vienne Objet: Re: [SQL] Reading from hive table is listing all files in S3 Hi, Do you have a schema definition for this Hive table? What format is this table stored HTH Dr Mich Talebzadeh LinkedIn https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw http://talebzadehmich.wordpress.com Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On 3 August 2016 at 15:03, Mehdi Meziane < mehdi.mezi...@ldmobile.net > wrote: Hi all, We have a hive table stored in S3 and registered in a hive metastore. This table is partitionned with a key "day". So we access this table through the spark dataframe API as : sqlContext.read() .table("tablename) .where(col("day").between("2016-08-01","2016-08-02")) When the job is launched, we can see that spark have tasks "table" which have a small duration (seconds) but takes minutes. In the logs we see that every paths for every partitions are listed, regardless the partition key values, during minutes. 16/08/03 13:17:16 INFO HadoopFsRelation: Listing s3a://buckets3/day=2016-07-24 16/08/03 13:17:16 INFO HadoopFsRelation: Listing s3a://buckets3/day=2016-07-25 Is it a normal behaviour? Do we could specify something in the read().table, maybe some options? I tried to find such options but i cannot find anything. Thanks, Mehdi
[SQL] Reading from hive table is listing all files in S3
Hi all, We have a hive table stored in S3 and registered in a hive metastore. This table is partitionned with a key "day". So we access this table through the spark dataframe API as : sqlContext.read() .table("tablename) .where(col("day").between("2016-08-01","2016-08-02")) When the job is launched, we can see that spark have tasks "table" which have a small duration (seconds) but takes minutes. In the logs we see that every paths for every partitions are listed, regardless the partition key values, during minutes. 16/08/03 13:17:16 INFO HadoopFsRelation: Listing s3a://buckets3/day=2016-07-24 16/08/03 13:17:16 INFO HadoopFsRelation: Listing s3a://buckets3/day=2016-07-25 Is it a normal behaviour? Do we could specify something in the read().table, maybe some options? I tried to find such options but i cannot find anything. Thanks, Mehdi
Spark ML - Java implementation of custom Transformer
Hi all, We have some problems while implementing custom Transformers in JAVA (SPARK 1.6.1). We do override the method copy, but it crashes with an AbstractMethodError. If we extends the UnaryTransformer, and do not override the copy method, it works without any error. We tried to write the copy like in these examples : https://github.com/apache/spark/blob/branch-2.0/mllib/src/test/java/org/apache/spark/ml/param/JavaTestParams.java https://github.com/eBay/Spark/blob/branch-1.6/examples/src/main/java/org/apache/spark/examples/ml/JavaDeveloperApiExample.java None of it worked. The copy is defined in the Params class as : /** * Creates a copy of this instance with the same UID and some extra params. * Subclasses should implement this method and set the return type properly. * * @see [[defaultCopy()]] */ def copy(extra: ParamMap): Params Any idea? Thanks, Mehdi