[GitHub] spark issue #21308: [SPARK-24253][SQL] Add DeleteSupport mix-in for DataSour...
Github user tigerquoll commented on the issue: https://github.com/apache/spark/pull/21308 @rdblue when you say "you don't think the API proposed here needs to support a first-class partition concept", are you referring to the "DeleteSupport" Interface, or to DataSourceV2 in general? If you are referring to DeleteSupport, then do you have the same objections to a separate "DropPartition"/"AddPartition" interface? If you mean that you don't think DataSourceV2 requires supporting partitions as a first-class concept, then how are users of spark supposed to perform operations like 1. adding, 2. altering, 3. removing, and 4. listing partitions on those data-sources that are represented by particular instances of DatasourceV2? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21308: [SPARK-24253][SQL] Add DeleteSupport mix-in for DataSour...
Github user tigerquoll commented on the issue: https://github.com/apache/spark/pull/21308 @rdblue I think our debate is whether we should expose an API to represent direct operations on partitions in the new datasource api. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21308: [SPARK-24253][SQL] Add DeleteSupport mix-in for DataSour...
Github user tigerquoll commented on the issue: https://github.com/apache/spark/pull/21308 @rdblue Actually: https://issues.apache.org/jira/browse/SPARK-22389. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21308: [SPARK-24253][SQL] Add DeleteSupport mix-in for DataSour...
Github user tigerquoll commented on the issue: https://github.com/apache/spark/pull/21308 @rdblue what about those data sources that support record deletion and partition dropping as two semantically different operations - Kudu and Hbase being two examples. All systems that support partitions have a different api for dealing with partition level ops. Even file based table storage systems support the different levels of manipulation. (look at the sql DDL that impala supports for parquet partition for an example - they use a filter, but the command is âthis partition op applies to the partiton that is defined by this filterâ, not âapply this op to all records that match this filter)â The difference is subtle, but it is an important one, and every system that supports partitions enforces that difference for a reason. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21306: [SPARK-24252][SQL] Add catalog registration and table ca...
Github user tigerquoll commented on the issue: https://github.com/apache/spark/pull/21306 So Kudu range partitions support arbitrary sized partition intervals, like the example below, where the first and last range partition are six months in size, but the middle partition is one year in size. -- Make a table representing a date/time value as TIMESTAMP. -- The strings representing the partition bounds are automatically -- cast to TIMESTAMP values. create table native_timestamp(id bigint, when_exactly timestamp, event string, primary key (id, when_exactly)) range (when_exactly) ( partition '2015-06-01' <= values < '2016-01-01', partition '2016-01-01' <= values < '2017-01-01', partition '2017-01-01' <= values < '2017-06-01' ) stored as kudu; --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21306: [SPARK-24252][SQL] Add catalog registration and table ca...
Github user tigerquoll commented on the issue: https://github.com/apache/spark/pull/21306 Sure, I am looking at the point of view of supporting Kudu. Check out https://kudu.apache.org/docs/schema_design.html#partitioning for some of the details. In particular https://kudu.apache.org/2016/08/23/new-range-partitioning-features.html. As kudu is a column store, each column also has attributes associated with it such as encoding and compression codecs. Apache Kudu - Apache Kudu Schema Design<https://kudu.apache.org/docs/schema_design.html#partitioning> A new open source Apache Hadoop ecosystem project, Apache Kudu completes Hadoop's storage layer to enable fast analytics on fast data kudu.apache.org I really think that partitions should be considered part of the table schema. They have an existence above and beyond the definition of a filter that matches a record. Adding an empty partition changes the state of many underlying systems. Many systems that support partitions also have APIs for adding and removing partition definitions, some systems require partition information to be specified during table creation. Those systems that support changing partitions after creation usually have specific for adding and removing partitions. Dale, From: Ryan Blue Sent: Tuesday, 4 September 2018 4:20 PM To: apache/spark Cc: tigerquoll; Comment Subject: Re: [apache/spark] [SPARK-24252][SQL] Add catalog registration and table catalog APIs. (#21306) Can we support column range partition predicates please? This has an "apply" transform for passing other functions directly through, so that may help if you have additional transforms that aren't committed to Spark yet. As for range partitioning, can you be more specific about what you mean? What does that transform function look like? Part of the rationale for the existing proposal is that these are all widely used and understood. I want to make sure that as we expand the set of validated transforms, we aren't introducing confusion. Also, could you share the use case you intend for this? It would be great to hear about uses other than just Iceberg tables. â You are receiving this because you commented. Reply to this email directly, view it on GitHub<https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fspark%2Fpull%2F21306%23issuecomment-418430089=02%7C01%7C%7C335b27fc36b2449d1ac208d612824fa2%7C84df9e7fe9f640afb435%7C1%7C0%7C636716748222067761=yWzFakaWAq5yhYAo%2FuBoFkIXpP9hoh9f1N6xm3XcQOs%3D=0>, or mute the thread<https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fnotifications%2Funsubscribe-auth%2FAH9Fuh3RPZ-hTd5T3e92TX-xmiPHEGv5ks5uXqhEgaJpZM4T8FJh=02%7C01%7C%7C335b27fc36b2449d1ac208d612824fa2%7C84df9e7fe9f640afb435%7C1%7C0%7C636716748222067761=wJSnYO69FKZ8ZHbqGNrxxGsjC1W0rR7NIWOAE0EqXTA%3D=0>. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21308: [SPARK-24253][SQL] Add DeleteSupport mix-in for DataSour...
Github user tigerquoll commented on the issue: https://github.com/apache/spark/pull/21308 I am assuming this API was intended to support the "drop partition" use-case. I'm arguing that adding and deleting partitions deal with a concept that is a slightly higher concept than just a bunch of records that match a filter. Backing up this fact is the concept that partitions are defined independently of any records they may or may not contain - You can add an empty partition and the underlying state of the system will change. Also - as an end user I would be very upset if I meant to drop a partition, but because of a transcription error accidentally started a delete process with a filter that didn't directly match a partition definition that takes a million times as long to execute. Partitions are an implementation optimisation that has leaked into higher level APIs because they are an extremely useful and performant implementation optimisation. I am wondering if we should represent them in this API as something slightly more higher level then just a filter definition. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21306: [SPARK-24252][SQL] Add catalog registration and t...
Github user tigerquoll commented on a diff in the pull request: https://github.com/apache/spark/pull/21306#discussion_r214808418 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/TableChange.java --- @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog.v2; + +import org.apache.spark.sql.types.DataType; + +/** + * TableChange subclasses represent requested changes to a table. These are passed to + * {@link TableCatalog#alterTable}. For example, + * + * import TableChange._ + * val catalog = source.asInstanceOf[TableSupport].catalog() + * catalog.alterTable(ident, + * addColumn("x", IntegerType), + * renameColumn("a", "b"), + * deleteColumn("c") + * ) + * + */ +public interface TableChange { --- End diff -- is adding or dropping table partitions a table change? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21306: [SPARK-24252][SQL] Add catalog registration and t...
Github user tigerquoll commented on a diff in the pull request: https://github.com/apache/spark/pull/21306#discussion_r214806854 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/TableChange.java --- @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog.v2; + +import org.apache.spark.sql.types.DataType; + +/** + * TableChange subclasses represent requested changes to a table. These are passed to + * {@link TableCatalog#alterTable}. For example, + * + * import TableChange._ + * val catalog = source.asInstanceOf[TableSupport].catalog() + * catalog.alterTable(ident, + * addColumn("x", IntegerType), + * renameColumn("a", "b"), + * deleteColumn("c") + * ) + * + */ +public interface TableChange { --- End diff -- Can we support adding a comment to a column? / table? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21306: [SPARK-24252][SQL] Add catalog registration and table ca...
Github user tigerquoll commented on the issue: https://github.com/apache/spark/pull/21306 Can we support column range partition predicates please? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [CORE] [SPARK-6593] Provide option for HadoopR...
Github user tigerquoll commented on the pull request: https://github.com/apache/spark/pull/5250#issuecomment-116651383 Correct. Cheers,Dale. Date: Fri, 19 Jun 2015 06:01:26 -0700 From: notificati...@github.com To: sp...@noreply.github.com CC: tigerqu...@outlook.com Subject: Re: [spark] [CORE] [SPARK-6593] Provide option for HadoopRDD to skip corrupted files (#5250) I think this should be closed now in favor of #5368 right? â Reply to this email directly or view it on GitHub. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [CORE] [SPARK-6593] Provide option for HadoopR...
Github user tigerquoll closed the pull request at: https://github.com/apache/spark/pull/5250 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6214][CORE] - A simple expression langu...
Github user tigerquoll closed the pull request at: https://github.com/apache/spark/pull/4937 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6214][CORE] - A simple expression langu...
Github user tigerquoll commented on the pull request: https://github.com/apache/spark/pull/4937#issuecomment-98918915 Ok, no problems. I can understand why. Regards, Dale. Sent from my iPad On 4 May 2015, at 9:48 pm, Sean Owen notificati...@github.com wrote: Do you mind closing this PR, simply because I don't think it's going to proceed in Spark? It will not disappear but still be here for reference, and as I say I do think it's interesting enough to merit its own project. â Reply to this email directly or view it on GitHub. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [CORE] [SPARK-6593] Provide a HadoopRDD varian...
Github user tigerquoll commented on the pull request: https://github.com/apache/spark/pull/5368#issuecomment-93705438 Sounds like a reasonable idea, I'll put another PR together. Regards,Dale. Date: Wed, 15 Apr 2015 10:25:26 -0700 From: notificati...@github.com To: sp...@noreply.github.com CC: tigerqu...@outlook.com Subject: Re: [spark] [CORE] [SPARK-6593] Provide a HadoopRDD variant that wraps all reads in a Try (#5368) An RDD that handles read exceptions with Try does seem useful to me - I've seen a decent amount of boilerplate used to accomplish the same thing. Is there a reason this is HadoopRDD-specific? I.e. why not provide something like a TryRDD that can wrap any RDD doing a risky operation? â Reply to this email directly or view it on GitHub. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [CORE] [SPARK-6593] Provide a HadoopRDD varian...
Github user tigerquoll commented on the pull request: https://github.com/apache/spark/pull/5368#issuecomment-89942497 The name of the RDD is a placeholder. I'm happy to accept any suggestions you have for a name that makes more sense to you. The InputFormat based solution you mention implies swallowing exceptions and continuing without a trace - something you argued against in the previous PR (https://github.com/apache/spark/pull/5250) related to this issue. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [CORE] [SPARK-6593] Provide a HadoopRDD varian...
GitHub user tigerquoll opened a pull request: https://github.com/apache/spark/pull/5368 [CORE] [SPARK-6593] Provide a HadoopRDD variant that wraps all reads in a Try @rxin @sryza As per attached jira ticket, I was proposing some way of making hadoop IO more robust in response to malformated files that cause an exception in the hadoop input libraries. The previous PR (#5250) simply added an option to ignore exceptions and continue processing. This raised some concerns about not giving any indication to the user that an error had occurred (other than a warning message in the logs.) As an alternative to the first PR, and to keep the conversation moving, I'm putting forward this PR, which contains HadoopReliableRDD, which wraps all Hadoop IO in a Try structure and passes that back to the user so that the user is forced to act upon any non-fatal errors that occur when reading from an Hadoop file system using this API. An example of using this API is given below: ``` import org.apache.hadoop.io.{LongWritable, Text} import org.apache.hadoop.mapred.TextInputFormat import scala.util.{Failure, Success, Try} // hdfs directory contains test[1.4].txt.gz - test4.txt.gz is corrupted val path = hdfs:///user/cloudera/*.gz val testRdd = sc.hadoopReliableFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],2) ⦠15/04/05 21:24:57 INFO rdd.HadoopRDD: Input split: hdfs://quickstart.cloudera:8020/user/cloudera/test4.txt.gz:0+42544 15/04/05 21:24:57 INFO rdd.HadoopRDD: Input split: hdfs://quickstart.cloudera:8020/user/cloudera/test3.txt.gz:0+15043 15/04/05 21:24:57 INFO rdd.HadoopRDD: Input split: hdfs://quickstart.cloudera:8020/user/cloudera/test1.txt.gz:0+15043 15/04/05 21:24:57 INFO rdd.HadoopRDD: Input split: hdfs://quickstart.cloudera:8020/user/cloudera/test2.txt.gz:0+15043 ⦠15/04/05 21:24:57 WARN rdd.HadoopReliableRDD: Exception on read attempt IOException - not a gzip file - stopping any further read attempts from this split ⦠testRdd.count ⦠res5: Long = 4384 testRdd.filter(x=x.isSuccess).count ... res6: Long = 4383 testRdd.map( _ match { case Failure(ex) = s${ex.getClass.getSimpleName}: ${ex.getMessage} case Success(_) = success }).countByValue() .. res7: scala.collection.Map[String,Long] = Map(IOException: not a gzip file - 1, success - 4383) testRdd.filter(_.isSuccess).map(_.get.toString).flatMap(_.split( )).countByValue ... ⦠and on we go⦠``` Just to emphasise what we are doing here. We are trapping exceptions that previously would have stopped Spark from executing, and continuing to process data in other tasks rather than terminating the entire application which is what previously happened . If the exceptions were being raised in a non-deterministic way (eg temporary network failure) then the produced data could be different if the code is run a second time. If we didn't handle these errors, then the entire application will currently fail in a non-deterministic way. A lot of times the later behavior is the desirable response, but sometimes it is not. If the exceptions were being raised in a deterministic way (eg file was corrupt before it was copied into HDFS), then the data will be produced in a deterministic way that can be repeated with the same results. I believe giving users some way to process data that Spark currently crashes on is worth considering, what ever form it finally takes. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tigerquoll/spark master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/5368.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5368 commit bffc68e64098fe24464dfe0eca0eba15afef85c8 Author: Dale tigerqu...@outlook.com Date: 2015-04-04T11:43:03Z [SPARK-6593] Refactored HadoopRDD so that the iterator could be cleanly wrapped commit fd75bc1dae5b707bcdd914d4bca852b2594daa60 Author: Dale tigerqu...@outlook.com Date: 2015-04-04T11:44:09Z [SPARK-6593] Added HadoopReliableRDD, which returns all read actions in a wrapped in a Try commit d2ab7044516ebda33ceeca70bb6aa9c217af42ca Author: Dale tigerqu...@outlook.com Date: 2015-04-05T12:14:41Z [SPARK-6593] Fine-tuned the iterator wrapper logic commit cb88164bce0552617b5dbdfdff0f02fe018d84d5 Author: Dale tigerqu...@outlook.com Date: 2015-04-05T12:15:48Z [SPARK-6593] Added convenience method hadoopReliableFile() to Scala Context --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have
[GitHub] spark pull request: [CORE] [SPARK-6593] Provide option for HadoopR...
Github user tigerquoll commented on a diff in the pull request: https://github.com/apache/spark/pull/5250#discussion_r27377569 --- Diff: core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala --- @@ -246,6 +249,15 @@ class HadoopRDD[K, V]( } catch { case eof: EOFException = finished = true + case e: Exception = --- End diff -- Having been on the receiving end of things I know that the gzip module throws an IOException, but unfortunately I have no knowledge over what the Hadoop input modules and what exceptions they throw, or if they propagate exceptions up from other 3rd party libraries. Catching such a broad exception is mitigated by the fact that this particular option defaults to off, and should only be enabled when you are trying to parse files that you know are corrupt. Given the situation, when you turn the option on, we should really try to finish processing files to the best of our ability, thus I think in this case catching 'Exception' might be appropriate. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [CORE] [SPARK-6593] Provide option for HadoopR...
Github user tigerquoll commented on the pull request: https://github.com/apache/spark/pull/5250#issuecomment-87681300 If a user can write scala codes that appropriately deals with the problem, why can't they write spark code to deal with it in parallel? Isn't this what spark is about? Isn't this a problem that can be readily parallelised? Spark is being put forward as data processing framework - bad data needs to be handled in some way better then just refusing to have anything to do with it. I believe to parallelise your mentioned solution means adding to the public API, which takes time and consideration. The option was considered as a scoped, quick fix solution to at least give users some ability to continue - the idea would be to retire the option once a new API was in place to gracefully deal with the problem. In regards to the option being presented to the users as a fine thing to do when I don't believe it is - how about providing the information to the user a letting the users chose themselves? A good point about an option being a public API though - what is the understanding about how stable options are? No real Experimental or DeveloperAPI tags available here. Your proposed solution was the same solution I ended up settling on when first confronted with the issue - but only after a number of frustrated attempts at getting spark to do what I wanted it to. What you proposed and what I did In the end was to give up using spark and to bashing out some standalone code using hadoop libraries to do the job. ie: Stopped using spark and used another tool that made my job easier. I felt that it didn't have to be this way. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [CORE] [SPARK-6593] Provide option for HadoopR...
Github user tigerquoll commented on the pull request: https://github.com/apache/spark/pull/5250#issuecomment-87650287 Hi Sean, Thanks for your input - your views have helped me refine my thinking on the matter. I believe that If you take a purist's point of view then yes you can say the source of the problem (likely) is with the data producer and should be fixed at the data producer's end. The point being is that this is a problem that is affecting many spark users right now, and many users are not in control of the source system of the data they are analysing and are forced to 'make do' with what they have. You call this solution a band-aid - but many ETL solutions are a bandaid - but providing this functionality is useful and serves a purpose for the end-user. Are you concerned that swallowing an exception could leave the hadoop input libraries in an inconsistent state, causing more data corruption? This will not happen because swallowing the exception triggers the immediate finish of the file reading task and no more data will be read by the task. Are you concerned that swallowing an exception indicates that something has potentially gone wrong earlier in the hadoop input read, and that previous data could have been corrupted? The user already knows this is potentially the case because running the application without this option enabled has caused the application to terminate in the first place. The fact that we are being more permissive of potentially corrupt data is a show stopper for this being default behaviour - but I'm not proposing this be default behaviour, I'm proposing this be a last-ditch option that an advanced user can knowingly enable when attempting to deal with corrupted data, with the understanding that their data could be made worse, but most likely corrupt data will be omitted. The alternative is to tell them that their data is not suitable for being loaded into spark and perhaps they should use another tool or tell the data system owner to fix their data feeds and get back to them with another data set some time in the future. I know which option I would prefer if given the choice - don't let perfect be the enemy of good. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [CORE] [SPARK-6593] Provide option for HadoopR...
Github user tigerquoll commented on the pull request: https://github.com/apache/spark/pull/5250#issuecomment-87822606 To sum up the debate so far there doesn't appear to be any major concerns about leaving the system in an inconsistent state. The major concern seems to be about swallowing of the exception and not letting any sign of it propagate back up to the main flow of execution, with the additional potential risk that the fact we've done so could lead to non-deterministic results. By swallowing non-deterministic exceptions, we are explicitly converting non-deterministic application crashing to potentially non-deterministic data - no getting around this fact. I'd like to achieve an outcome here, and I'm not really wedded to the means that outcome is achieved. As a counter-proposal, how about a new HadoopRDD derivative that works with Try((K,V))? We propagate the exception cleanly back to calling code and allow them to deal with it explicitly. it is slightly more cumbersome to use, but you'd only use it when you are expecting corrupted data back, and this allows an easy manipulation of data that contains potential exceptions. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [CORE] [SPARK-6593] Provide option for HadoopR...
Github user tigerquoll commented on the pull request: https://github.com/apache/spark/pull/5250#issuecomment-87825625 I'd Also like to point out the converse of my second paragraph above - we are also giving a means of converting deterministic application crashing to deterministic data. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [CORE] [SPARK-6593] Provide option for HadoopR...
GitHub user tigerquoll opened a pull request: https://github.com/apache/spark/pull/5250 [CORE] [SPARK-6593] Provide option for HadoopRDD to skip bad data splits When reading a large amount of files from HDFS eg. with sc.textFile(hdfs:///user/cloudera/logs*.gz). If a single split is corrupted then the entire job is canceled. As default behaviour this is probably for the best, but it would be nice in some circumstances where you know it will be ok to have the option to skip the corrupted portion and continue the job. Ideally I'd like to be able to report the list of bad files directly back to the master, but that would involve a public API change and is a discussion for later. For now, I propose a new option of 'spark.hadoop.ignoreInputErrors', which defaults to false to maintain the existing behaviour of terminating the job upon any exception from the hadoop input libraries. when set to true however, this option will simply log the error to the executor's log, skip the input-split that caused the error and continue processing. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tigerquoll/spark SPARK-6593a Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/5250.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5250 commit 37d2cbde5b67c94cac26e2c55030e9bb00678d7a Author: Dale tigerqu...@outlook.com Date: 2015-03-29T11:23:01Z [SPARK-6593] Added spark.hadoop.ignoreInputEerrors config option and implementation --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-6214][CORE] - A simple expression langu...
GitHub user tigerquoll opened a pull request: https://github.com/apache/spark/pull/4937 [Spark-6214][CORE] - A simple expression language to specify configuration options This is a proposal to allow for configuration options to be specified via a simple expression language. This language would have the following features:⨠* Allow for basic arithmetic (+-/*) with support bracketed expressions and standard precedence rules. * Support for and normalisation of common units of reference eg. MB, GB, seconds,minutes,hours, days and weeks. * Allow for the referencing of basic environmental information currently defined as: numCores: Number of cores assigned to the JVM⨠physicalMemoryBytes: Memory size of hosting machine â¨JVMTotalMemoryBytes: current bytes of memory allocated to the JVM⨠JVMMaxMemoryBytes:Maximum number of bytes of memory available to the JVM⨠JVMFreeMemoryBytes: maxMemoryBytes - totalMemoryBytes * Allow for the limited referencing of other configuration values when specifying values. (Other configuration values must be initialised and explicitly passed into the expression evaluator for this functionality to be enabled). Such a feature would have the following end-user benefits: * Allow for the flexibility in specifying time intervals or byte quantities in appropriate and easy to follow units e.g. 1 week rather rather then 604800 seconds * Have a consistent means of entering configuration information regardless of the configuration option being added. (eg questions such as âis the particular option specified in ms or seconds?â become irrelevant, because the user can pick what ever unit makes sense for the magnitude of the value they are specifying) * Allow for the scaling of a configuration option in relation to a system attributes. e.g. SPARK_WORKER_CORES = numCores - 1 SPARK_WORKER_MEMORY = physicalMemoryBytes - 1.5 GB * Being able to scale multiple configuration options together eg: spark.driver.memory = 0.75 * physicalMemoryBytes spark.driver.maxResultSize = spark.driver.memory * 0.8 This PR only contains the implementation of the expression language and associated unit tests (more then 120 unit tests). If this PR is accepted, the idea is that moving options over to use this expression language would be done in one or more follow-up PRs. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tigerquoll/spark SPARK-6214 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/4937.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4937 commit 49e981005ae736b879f6a4330ec17d2828c74400 Author: Dale tigerqu...@outlook.com Date: 2015-02-26T22:53:46Z Initial checkin of basic expression parser commit c6f137a0233aace1a1b6221c9ea3b89a4c8929dc Author: Dale tigerqu...@outlook.com Date: 2015-03-01T06:02:30Z WIP for ByteExpressionParser commit 07b8b00570c70b474adde81477461bd06658237d Author: Dale tigerqu...@outlook.com Date: 2015-03-01T12:41:12Z WIP for TimeExpressionParser commit ae710cdb40d6e28ecee8e2add1d91748920cc182 Author: Dale tigerqu...@outlook.com Date: 2015-03-05T21:40:48Z Code tidy up, added some comments, created factory object commit 6a32ef9b812259ed42c047332d0f23c68d445531 Author: Dale tigerqu...@outlook.com Date: 2015-03-07T06:25:52Z Added Physical Memory function, more unit tests for quantity objects commit 5867704a3abcc23cd3f3c56a4de2b1df26390877 Author: Dale tigerqu...@outlook.com Date: 2015-03-07T06:26:29Z Added more unit tests to increase code coverage commit 5db9ef08e398c371a4dd65a0e2d6c0f5c5b4a6ef Author: Dale tigerqu...@outlook.com Date: 2015-03-07T06:27:21Z Refactored code commit 6795a8d70f96c22c229447ee727787389079a474 Author: Dale tigerqu...@outlook.com Date: 2015-03-07T06:45:56Z lint checks now pass commit c6cb7f13eed00d702c3d29d8e6bbd53f4acf35cb Author: Dale tigerqu...@outlook.com Date: 2015-03-07T06:49:27Z Quick import tidyup commit ede092d1f8b1049bb37a0aa022a24c6e0e14b39d Author: Dale tigerqu...@outlook.com Date: 2015-03-07T09:38:44Z Moved cpuCores to MachineInfoFunctions trait commit 5fd80e9828a2808f73ed72580827b37ffd69dc82 Author: Dale tigerqu...@outlook.com Date: 2015-03-07T09:57:11Z Updated comments --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA
[GitHub] spark pull request: spark-core - [SPARK-4787] - Stop sparkcontext ...
Github user tigerquoll commented on a diff in the pull request: https://github.com/apache/spark/pull/3809#discussion_r22334486 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -329,8 +329,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli try { dagScheduler = new DAGScheduler(this) } catch { -case e: Exception = throw - new SparkException(DAGScheduler cannot be initialized due to %s.format(e.getMessage)) +case e: Exception = { + stop() --- End diff -- Excellent idea Josh. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: spark-core - [SPARK-4787] - Stop sparkcontext ...
GitHub user tigerquoll opened a pull request: https://github.com/apache/spark/pull/3809 spark-core - [SPARK-4787] - Stop sparkcontext properly if a DAGScheduler init error occurs [SPARK-4787] Stop SparkContext properly if an exception occurs during DAGscheduler initialization. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tigerquoll/spark SPARK-4787 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/3809.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3809 commit 217257879fe7c98673caf14b980790498887581e Author: Dale tigerqu...@outlook.com Date: 2014-12-26T09:33:05Z [SPARK-4787] Stop context properly if an exception occurs during DAGScheduler initialization. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4567. Make SparkJobInfo and SparkStageIn...
Github user tigerquoll commented on the pull request: https://github.com/apache/spark/pull/3426#issuecomment-68165724 Heh @JoshRosen @sryza , should this patch include a serialVersionUID attribute on the classes to be serialized to make sure compiler quirks don't cause different UIDs to be generated for the classes? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4567. Make SparkJobInfo and SparkStageIn...
Github user tigerquoll commented on the pull request: https://github.com/apache/spark/pull/3426#issuecomment-68166415 http://stackoverflow.com/questions/285793/what-is-a-serialversionuid-and-why-should-i-use-it seems to be a good summary of the pros and cons of this approach --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
Github user tigerquoll commented on the pull request: https://github.com/apache/spark/pull/2516#issuecomment-66909268 No probs, it was actually a nice way of starting to poke through the code to figure out how things are put together. I'll stick to smaller jobs from now on in. Regards,Dale. Date: Tue, 9 Dec 2014 19:11:25 -0800 From: notificati...@github.com To: sp...@noreply.github.com CC: tigerqu...@outlook.com Subject: Re: [spark] Spark Core - [SPARK-3620] - Refactor of SparkSubmit Argument parsing code (#2516) Hey @tigerquoll usually for large patches like this we require a design doc on the JIRA. Especially because the priority of this is not super important, I would recommend that we close this issue for now, and maybe open a new one later once there is a consensus on how we should restructure Spark submit. Thanks for your work so far. â Reply to this email directly or view it on GitHub. = --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
Github user tigerquoll commented on a diff in the pull request: https://github.com/apache/spark/pull/2516#discussion_r19793754 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala --- @@ -83,216 +79,163 @@ object SparkSubmit { * (4) the main class for the child */ private[spark] def createLaunchEnv(args: SparkSubmitArguments) - : (ArrayBuffer[String], ArrayBuffer[String], Map[String, String], String) = { + : (mutable.ArrayBuffer[String], mutable.ArrayBuffer[String], Map[String, String], String) = { // Values to return -val childArgs = new ArrayBuffer[String]() -val childClasspath = new ArrayBuffer[String]() -val sysProps = new HashMap[String, String]() +val childArgs = new mutable.ArrayBuffer[String]() +val childClasspath = new mutable.ArrayBuffer[String]() +val sysProps = new mutable.HashMap[String, String]() var childMainClass = -// Set the cluster manager -val clusterManager: Int = args.master match { - case m if m.startsWith(yarn) = YARN - case m if m.startsWith(spark) = STANDALONE - case m if m.startsWith(mesos) = MESOS - case m if m.startsWith(local) = LOCAL - case _ = printErrorAndExit(Master must start with yarn, spark, mesos, or local); -1 -} - -// Set the deploy mode; default is client mode -var deployMode: Int = args.deployMode match { - case client | null = CLIENT - case cluster = CLUSTER - case _ = printErrorAndExit(Deploy mode must be either client or cluster); -1 -} - -// Because yarn-cluster and yarn-client encapsulate both the master -// and deploy mode, we have some logic to infer the master and deploy mode -// from each other if only one is specified, or exit early if they are at odds. -if (clusterManager == YARN) { - if (args.master == yarn-standalone) { -printWarning(\yarn-standalone\ is deprecated. Use \yarn-cluster\ instead.) -args.master = yarn-cluster - } - (args.master, args.deployMode) match { -case (yarn-cluster, null) = - deployMode = CLUSTER -case (yarn-cluster, client) = - printErrorAndExit(Client deploy mode is not compatible with master \yarn-cluster\) -case (yarn-client, cluster) = - printErrorAndExit(Cluster deploy mode is not compatible with master \yarn-client\) -case (_, mode) = - args.master = yarn- + Option(mode).getOrElse(client) - } - +if (args.clusterManagerFlag == CM_YARN) { // Make sure YARN is included in our build if we're trying to use it if (!Utils.classIsLoadable(org.apache.spark.deploy.yarn.Client) !Utils.isTesting) { printErrorAndExit( Could not load YARN classes. + This copy of Spark may not have been compiled with YARN support.) } -} - -// The following modes are not supported or applicable -(clusterManager, deployMode) match { - case (MESOS, CLUSTER) = -printErrorAndExit(Cluster deploy mode is currently not supported for Mesos clusters.) - case (_, CLUSTER) if args.isPython = -printErrorAndExit(Cluster deploy mode is currently not supported for python applications.) - case (_, CLUSTER) if isShell(args.primaryResource) = -printErrorAndExit(Cluster deploy mode is not applicable to Spark shells.) - case _ = + val hasHadoopEnv = sys.env.contains(HADOOP_CONF_DIR) || sys.env.contains(YARN_CONF_DIR) + if (!hasHadoopEnv !Utils.isTesting) { +throw new Exception(When running with master ' + args.master + ' + + either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.) + } } // If we're running a python app, set the main class to our specific python runner if (args.isPython) { if (args.primaryResource == PYSPARK_SHELL) { -args.mainClass = py4j.GatewayServer -args.childArgs = ArrayBuffer(--die-on-broken-pipe, 0) +args.mainClass = PY4J_GATEWAYSERVER +args.childArgs = mutable.ArrayBuffer(--die-on-broken-pipe, 0) } else { // If a python file is provided, add it to the child arguments and list of files to deploy. // Usage: PythonAppRunner main python file extra python files [app arguments] -args.mainClass = org.apache.spark.deploy.PythonRunner -args.childArgs = ArrayBuffer(args.primaryResource, args.pyFiles) ++ args.childArgs -args.files = mergeFileLists(args.files, args.primaryResource) +args.mainClass = PYTHON_RUNNER
[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
Github user tigerquoll commented on a diff in the pull request: https://github.com/apache/spark/pull/2516#discussion_r19793870 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala --- @@ -33,30 +34,25 @@ import org.apache.spark.util.Utils * a layer over the different cluster managers and deploy modes that Spark supports. */ object SparkSubmit { - - // Cluster managers - private val YARN = 1 - private val STANDALONE = 2 - private val MESOS = 4 - private val LOCAL = 8 - private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL - - // Deploy modes - private val CLIENT = 1 - private val CLUSTER = 2 - private val ALL_DEPLOY_MODES = CLIENT | CLUSTER - // A special jar name that indicates the class being run is inside of Spark itself, and therefore // no user jar is needed. - private val SPARK_INTERNAL = spark-internal + val SPARK_INTERNAL = spark-internal // Special primary resource names that represent shells rather than application jars. - private val SPARK_SHELL = spark-shell - private val PYSPARK_SHELL = pyspark-shell + val SPARK_SHELL = spark-shell + val PYSPARK_SHELL = pyspark-shell + + // Special python classes + val PY4J_GATEWAYSERVER: String = py4j.GatewayServer + val PYTHON_RUNNER: String = org.apache.spark.deploy.PythonRunner --- End diff -- done. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
Github user tigerquoll commented on a diff in the pull request: https://github.com/apache/spark/pull/2516#discussion_r19794246 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala --- @@ -83,216 +79,163 @@ object SparkSubmit { * (4) the main class for the child */ private[spark] def createLaunchEnv(args: SparkSubmitArguments) - : (ArrayBuffer[String], ArrayBuffer[String], Map[String, String], String) = { + : (mutable.ArrayBuffer[String], mutable.ArrayBuffer[String], Map[String, String], String) = { // Values to return -val childArgs = new ArrayBuffer[String]() -val childClasspath = new ArrayBuffer[String]() -val sysProps = new HashMap[String, String]() +val childArgs = new mutable.ArrayBuffer[String]() +val childClasspath = new mutable.ArrayBuffer[String]() +val sysProps = new mutable.HashMap[String, String]() var childMainClass = -// Set the cluster manager -val clusterManager: Int = args.master match { - case m if m.startsWith(yarn) = YARN - case m if m.startsWith(spark) = STANDALONE - case m if m.startsWith(mesos) = MESOS - case m if m.startsWith(local) = LOCAL - case _ = printErrorAndExit(Master must start with yarn, spark, mesos, or local); -1 -} - -// Set the deploy mode; default is client mode -var deployMode: Int = args.deployMode match { - case client | null = CLIENT - case cluster = CLUSTER - case _ = printErrorAndExit(Deploy mode must be either client or cluster); -1 -} - -// Because yarn-cluster and yarn-client encapsulate both the master -// and deploy mode, we have some logic to infer the master and deploy mode -// from each other if only one is specified, or exit early if they are at odds. -if (clusterManager == YARN) { - if (args.master == yarn-standalone) { -printWarning(\yarn-standalone\ is deprecated. Use \yarn-cluster\ instead.) -args.master = yarn-cluster - } - (args.master, args.deployMode) match { -case (yarn-cluster, null) = - deployMode = CLUSTER -case (yarn-cluster, client) = - printErrorAndExit(Client deploy mode is not compatible with master \yarn-cluster\) -case (yarn-client, cluster) = - printErrorAndExit(Cluster deploy mode is not compatible with master \yarn-client\) -case (_, mode) = - args.master = yarn- + Option(mode).getOrElse(client) - } - +if (args.clusterManagerFlag == CM_YARN) { // Make sure YARN is included in our build if we're trying to use it if (!Utils.classIsLoadable(org.apache.spark.deploy.yarn.Client) !Utils.isTesting) { printErrorAndExit( Could not load YARN classes. + This copy of Spark may not have been compiled with YARN support.) } -} - -// The following modes are not supported or applicable -(clusterManager, deployMode) match { - case (MESOS, CLUSTER) = -printErrorAndExit(Cluster deploy mode is currently not supported for Mesos clusters.) - case (_, CLUSTER) if args.isPython = -printErrorAndExit(Cluster deploy mode is currently not supported for python applications.) - case (_, CLUSTER) if isShell(args.primaryResource) = -printErrorAndExit(Cluster deploy mode is not applicable to Spark shells.) - case _ = + val hasHadoopEnv = sys.env.contains(HADOOP_CONF_DIR) || sys.env.contains(YARN_CONF_DIR) + if (!hasHadoopEnv !Utils.isTesting) { +throw new Exception(When running with master ' + args.master + ' + + either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.) + } } // If we're running a python app, set the main class to our specific python runner if (args.isPython) { if (args.primaryResource == PYSPARK_SHELL) { -args.mainClass = py4j.GatewayServer -args.childArgs = ArrayBuffer(--die-on-broken-pipe, 0) +args.mainClass = PY4J_GATEWAYSERVER +args.childArgs = mutable.ArrayBuffer(--die-on-broken-pipe, 0) } else { // If a python file is provided, add it to the child arguments and list of files to deploy. // Usage: PythonAppRunner main python file extra python files [app arguments] -args.mainClass = org.apache.spark.deploy.PythonRunner -args.childArgs = ArrayBuffer(args.primaryResource, args.pyFiles) ++ args.childArgs -args.files = mergeFileLists(args.files, args.primaryResource) +args.mainClass = PYTHON_RUNNER
[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
Github user tigerquoll commented on a diff in the pull request: https://github.com/apache/spark/pull/2516#discussion_r19794638 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala --- @@ -17,201 +17,286 @@ package org.apache.spark.deploy -import java.io.{File, FileInputStream, IOException} -import java.util.Properties +import java.io._ import java.util.jar.JarFile -import scala.collection.JavaConversions._ -import scala.collection.mutable.{ArrayBuffer, HashMap} +import scala.collection._ import org.apache.spark.SparkException +import org.apache.spark.deploy.ConfigConstants._ import org.apache.spark.util.Utils +import org.apache.spark.deploy.SparkSubmitArguments._ /** - * Parses and encapsulates arguments from the spark-submit script. - * The env argument is used for testing. - */ -private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, String] = sys.env) { - var master: String = null - var deployMode: String = null - var executorMemory: String = null - var executorCores: String = null - var totalExecutorCores: String = null - var propertiesFile: String = null - var driverMemory: String = null - var driverExtraClassPath: String = null - var driverExtraLibraryPath: String = null - var driverExtraJavaOptions: String = null - var driverCores: String = null - var supervise: Boolean = false - var queue: String = null - var numExecutors: String = null - var files: String = null - var archives: String = null - var mainClass: String = null - var primaryResource: String = null - var name: String = null - var childArgs: ArrayBuffer[String] = new ArrayBuffer[String]() - var jars: String = null - var verbose: Boolean = false - var isPython: Boolean = false - var pyFiles: String = null - val sparkProperties: HashMap[String, String] = new HashMap[String, String]() - - /** Default properties present in the currently defined defaults file. */ - lazy val defaultSparkProperties: HashMap[String, String] = { -val defaultProperties = new HashMap[String, String]() -if (verbose) SparkSubmit.printStream.println(sUsing properties file: $propertiesFile) -Option(propertiesFile).foreach { filename = - val file = new File(filename) - SparkSubmitArguments.getPropertiesFromFile(file).foreach { case (k, v) = -if (k.startsWith(spark)) { - defaultProperties(k) = v - if (verbose) SparkSubmit.printStream.println(sAdding default property: $k=$v) -} else { - SparkSubmit.printWarning(sIgnoring non-spark config property: $k=$v) -} - } -} -defaultProperties + * Pulls and validates configuration information together in order of priority + * + * Entries in the conf Map will be filled in the following priority order + * 1. entries specified on the command line (except from --conf entries) + * 2. Entries specified on the command line with --conf + * 3. Legacy environment variables + * 4 SPARK_DEFAULT_CONF/spark-defaults.conf or SPARK_HOME/conf/spark-defaults.conf if either exist + * 5. hard coded defaults + * +*/ +private[spark] class SparkSubmitArguments(args: Seq[String]) { + /** + * Stores all configuration items except for child arguments, + * referenced by the constants defined in ConfigConstants.scala. + */ + val conf = new mutable.HashMap[String, String]() + + def master = conf(SPARK_MASTER) + def master_= (value: String):Unit = conf.put(SPARK_MASTER, value) + + def executorMemory = conf(SPARK_EXECUTOR_MEMORY) + def executorMemory_= (value: String):Unit = conf.put(SPARK_EXECUTOR_MEMORY, value) + + def executorCores = conf(SPARK_EXECUTOR_CORES) + def executorCores_= (value: String):Unit = conf.put(SPARK_EXECUTOR_CORES, value) + + def totalExecutorCores = conf.get(SPARK_CORES_MAX) + def totalExecutorCores_= (value: String):Unit = conf.put(SPARK_CORES_MAX, value) + + def driverMemory = conf(SPARK_DRIVER_MEMORY) + def driverMemory_= (value: String):Unit = conf.put(SPARK_DRIVER_MEMORY, value) + + def driverExtraClassPath = conf.get(SPARK_DRIVER_EXTRA_CLASSPATH) + def driverExtraClassPath_= (value: String):Unit = conf.put(SPARK_DRIVER_EXTRA_CLASSPATH, value) + + def driverExtraLibraryPath = conf.get(SPARK_DRIVER_EXTRA_LIBRARY_PATH) + def driverExtraLibraryPath_= (value: String):Unit = conf.put(SPARK_DRIVER_EXTRA_LIBRARY_PATH, value) + + def driverExtraJavaOptions = conf.get(SPARK_DRIVER_EXTRA_JAVA_OPTIONS) + def driverExtraJavaOptions_= (value: String):Unit = conf.put(SPARK_DRIVER_EXTRA_JAVA_OPTIONS, value
[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
Github user tigerquoll commented on a diff in the pull request: https://github.com/apache/spark/pull/2516#discussion_r19795115 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala --- @@ -17,201 +17,286 @@ package org.apache.spark.deploy -import java.io.{File, FileInputStream, IOException} -import java.util.Properties +import java.io._ import java.util.jar.JarFile -import scala.collection.JavaConversions._ -import scala.collection.mutable.{ArrayBuffer, HashMap} +import scala.collection._ import org.apache.spark.SparkException +import org.apache.spark.deploy.ConfigConstants._ import org.apache.spark.util.Utils +import org.apache.spark.deploy.SparkSubmitArguments._ /** - * Parses and encapsulates arguments from the spark-submit script. - * The env argument is used for testing. - */ -private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, String] = sys.env) { - var master: String = null - var deployMode: String = null - var executorMemory: String = null - var executorCores: String = null - var totalExecutorCores: String = null - var propertiesFile: String = null - var driverMemory: String = null - var driverExtraClassPath: String = null - var driverExtraLibraryPath: String = null - var driverExtraJavaOptions: String = null - var driverCores: String = null - var supervise: Boolean = false - var queue: String = null - var numExecutors: String = null - var files: String = null - var archives: String = null - var mainClass: String = null - var primaryResource: String = null - var name: String = null - var childArgs: ArrayBuffer[String] = new ArrayBuffer[String]() - var jars: String = null - var verbose: Boolean = false - var isPython: Boolean = false - var pyFiles: String = null - val sparkProperties: HashMap[String, String] = new HashMap[String, String]() - - /** Default properties present in the currently defined defaults file. */ - lazy val defaultSparkProperties: HashMap[String, String] = { -val defaultProperties = new HashMap[String, String]() -if (verbose) SparkSubmit.printStream.println(sUsing properties file: $propertiesFile) -Option(propertiesFile).foreach { filename = - val file = new File(filename) - SparkSubmitArguments.getPropertiesFromFile(file).foreach { case (k, v) = -if (k.startsWith(spark)) { - defaultProperties(k) = v - if (verbose) SparkSubmit.printStream.println(sAdding default property: $k=$v) -} else { - SparkSubmit.printWarning(sIgnoring non-spark config property: $k=$v) -} - } -} -defaultProperties + * Pulls and validates configuration information together in order of priority + * + * Entries in the conf Map will be filled in the following priority order + * 1. entries specified on the command line (except from --conf entries) + * 2. Entries specified on the command line with --conf + * 3. Legacy environment variables + * 4 SPARK_DEFAULT_CONF/spark-defaults.conf or SPARK_HOME/conf/spark-defaults.conf if either exist + * 5. hard coded defaults + * +*/ +private[spark] class SparkSubmitArguments(args: Seq[String]) { + /** + * Stores all configuration items except for child arguments, + * referenced by the constants defined in ConfigConstants.scala. + */ + val conf = new mutable.HashMap[String, String]() + + def master = conf(SPARK_MASTER) + def master_= (value: String):Unit = conf.put(SPARK_MASTER, value) + + def executorMemory = conf(SPARK_EXECUTOR_MEMORY) + def executorMemory_= (value: String):Unit = conf.put(SPARK_EXECUTOR_MEMORY, value) + + def executorCores = conf(SPARK_EXECUTOR_CORES) + def executorCores_= (value: String):Unit = conf.put(SPARK_EXECUTOR_CORES, value) + + def totalExecutorCores = conf.get(SPARK_CORES_MAX) + def totalExecutorCores_= (value: String):Unit = conf.put(SPARK_CORES_MAX, value) + + def driverMemory = conf(SPARK_DRIVER_MEMORY) + def driverMemory_= (value: String):Unit = conf.put(SPARK_DRIVER_MEMORY, value) + + def driverExtraClassPath = conf.get(SPARK_DRIVER_EXTRA_CLASSPATH) + def driverExtraClassPath_= (value: String):Unit = conf.put(SPARK_DRIVER_EXTRA_CLASSPATH, value) + + def driverExtraLibraryPath = conf.get(SPARK_DRIVER_EXTRA_LIBRARY_PATH) + def driverExtraLibraryPath_= (value: String):Unit = conf.put(SPARK_DRIVER_EXTRA_LIBRARY_PATH, value) + + def driverExtraJavaOptions = conf.get(SPARK_DRIVER_EXTRA_JAVA_OPTIONS) + def driverExtraJavaOptions_= (value: String):Unit = conf.put(SPARK_DRIVER_EXTRA_JAVA_OPTIONS, value
[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
Github user tigerquoll commented on a diff in the pull request: https://github.com/apache/spark/pull/2516#discussion_r19795245 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala --- @@ -227,91 +312,92 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St */ def parse(opts: Seq[String]): Unit = opts match { case (--name) :: value :: tail = -name = value +cmdLineConfig.put(SPARK_APP_NAME, value) parse(tail) case (--master) :: value :: tail = -master = value +cmdLineConfig.put(SPARK_MASTER, value) parse(tail) case (--class) :: value :: tail = -mainClass = value +cmdLineConfig.put(SPARK_APP_CLASS, value) parse(tail) case (--deploy-mode) :: value :: tail = -if (value != client value != cluster) { - SparkSubmit.printErrorAndExit(--deploy-mode must be either \client\ or \cluster\) -} -deployMode = value +cmdLineConfig.put(SPARK_DEPLOY_MODE, value) parse(tail) case (--num-executors) :: value :: tail = -numExecutors = value +cmdLineConfig.put(SPARK_EXECUTOR_INSTANCES, value) parse(tail) case (--total-executor-cores) :: value :: tail = -totalExecutorCores = value +cmdLineConfig.put(SPARK_CORES_MAX, value) parse(tail) case (--executor-cores) :: value :: tail = -executorCores = value +cmdLineConfig.put(SPARK_EXECUTOR_CORES, value) parse(tail) case (--executor-memory) :: value :: tail = -executorMemory = value +cmdLineConfig.put(SPARK_EXECUTOR_MEMORY, value) parse(tail) case (--driver-memory) :: value :: tail = -driverMemory = value +cmdLineConfig.put(SPARK_DRIVER_MEMORY, value) parse(tail) case (--driver-cores) :: value :: tail = -driverCores = value +cmdLineConfig.put(SPARK_DRIVER_CORES, value) parse(tail) case (--driver-class-path) :: value :: tail = -driverExtraClassPath = value +cmdLineConfig.put(SPARK_DRIVER_EXTRA_CLASSPATH, value) parse(tail) case (--driver-java-options) :: value :: tail = -driverExtraJavaOptions = value +cmdLineConfig.put(SPARK_DRIVER_EXTRA_JAVA_OPTIONS, value) parse(tail) case (--driver-library-path) :: value :: tail = -driverExtraLibraryPath = value +cmdLineConfig.put(SPARK_DRIVER_EXTRA_LIBRARY_PATH, value) parse(tail) case (--properties-file) :: value :: tail = -propertiesFile = value +/* We merge the property file config options into the rest of the command lines options + * after we have finished the rest of the command line processing as property files + * cannot override explicit command line options . + */ +cmdLinePropertyFileValues ++= Utils.getPropertyValuesFromFile(value) --- End diff -- I've changed back to old behaviour with the most recent merge form current, but I will print a warning to the user about what is going on if we detect multiple property files options --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
Github user tigerquoll commented on the pull request: https://github.com/apache/spark/pull/2516#issuecomment-58943431 Interesting, can you give any references as to what single slash file uris mean? neither RFC 1738 or 1630 seem to mention them, and http://en.wikipedia.org/wiki/File_URI_scheme only mentions them in passing in that some web browsers allow them even though it is against the spec. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
Github user tigerquoll commented on the pull request: https://github.com/apache/spark/pull/2516#issuecomment-58775682 Hi @vanzin, I've implemented your suggestions, tidied up the code more, and also added more unit tests to flesh out the test coverage. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
Github user tigerquoll commented on the pull request: https://github.com/apache/spark/pull/2516#issuecomment-58775741 @andrewor14 we could be stepping on each other's toes soon. Have a query about your work on Utils.ResolveUris. I notice that you produce and test for file URIs with a single fwd slash (file:/foo/bar). Shouldn't they have two fwd slashes (file://foo/bar) ? I've left my unit tests expecting a single fwd slash for now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
Github user tigerquoll commented on a diff in the pull request: https://github.com/apache/spark/pull/2516#discussion_r18745897 --- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala --- @@ -1479,6 +1479,14 @@ private[spark] object Utils extends Logging { PropertyConfigurator.configure(pro) } + /** + * Flatten a map of maps out into a single map, later maps in the propList --- End diff -- fixed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
Github user tigerquoll commented on a diff in the pull request: https://github.com/apache/spark/pull/2516#discussion_r18209006 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala --- @@ -17,155 +17,195 @@ package org.apache.spark.deploy -import java.io.{File, FileInputStream, IOException} -import java.util.Properties +import java.io.{InputStreamReader, File, FileInputStream, InputStream} import java.util.jar.JarFile +import java.util.Properties +import scala.collection._ +import scala.collection.JavaConverters._ import scala.collection.JavaConversions._ -import scala.collection.mutable.{ArrayBuffer, HashMap} +import org.apache.commons.lang3.CharEncoding -import org.apache.spark.SparkException +import org.apache.spark.deploy.ConfigConstants._ import org.apache.spark.util.Utils + + /** - * Parses and encapsulates arguments from the spark-submit script. - */ + * Pulls configuration information together in order of priority + * + * Entries in the conf Map will be filled in the following priority order + * 1. entries specified on the command line (except from --conf entries) + * 2. Entries specified on the command line with --conf + * 3. Environment variables (including legacy variable mappings) + * 4. System config variables (eg by using -Dspark.var.name) + * 5 SPARK_DEFAULT_CONF/spark-defaults.conf or SPARK_HOME/conf/spark-defaults.conf if either exist + * 6. hard coded defaults in class path at spark-submit-defaults.prop + * + * A property file specified by one of the means listed above gets read in and the properties are + * considered to be at the priority of the method that specified the files. A property specified in + * a property file will not override an existing config value at that same level +*/ private[spark] class SparkSubmitArguments(args: Seq[String]) { - var master: String = null - var deployMode: String = null - var executorMemory: String = null - var executorCores: String = null - var totalExecutorCores: String = null - var propertiesFile: String = null - var driverMemory: String = null - var driverExtraClassPath: String = null - var driverExtraLibraryPath: String = null - var driverExtraJavaOptions: String = null - var driverCores: String = null - var supervise: Boolean = false - var queue: String = null - var numExecutors: String = null - var files: String = null - var archives: String = null - var mainClass: String = null - var primaryResource: String = null - var name: String = null - var childArgs: ArrayBuffer[String] = new ArrayBuffer[String]() - var jars: String = null - var verbose: Boolean = false - var isPython: Boolean = false - var pyFiles: String = null - val sparkProperties: HashMap[String, String] = new HashMap[String, String]() - - /** Default properties present in the currently defined defaults file. */ - lazy val defaultSparkProperties: HashMap[String, String] = { -val defaultProperties = new HashMap[String, String]() -if (verbose) SparkSubmit.printStream.println(sUsing properties file: $propertiesFile) -Option(propertiesFile).foreach { filename = - val file = new File(filename) - SparkSubmitArguments.getPropertiesFromFile(file).foreach { case (k, v) = -if (k.startsWith(spark)) { - defaultProperties(k) = v - if (verbose) SparkSubmit.printStream.println(sAdding default property: $k=$v) -} else { - SparkSubmit.printWarning(sIgnoring non-spark config property: $k=$v) -} - } -} -defaultProperties - } + /** + * Stores all configuration items except for child arguments, + * referenced by the constants defined in ConfigConstants.scala + */ + val conf = new mutable.HashMap[String, String]() + + def master = conf(SparkMaster) + def master_= (value: String):Unit = conf.put(SparkMaster, value) + + def deployMode = conf(SparkDeployMode) + def deployMode_= (value: String):Unit = conf.put(SparkDeployMode, value) + + def executorMemory = conf(SparkExecutorMemory) + def executorMemory_= (value: String):Unit = conf.put(SparkExecutorMemory, value) + + def executorCores = conf(SparkExecutorCores) + def executorCores_= (value: String):Unit = conf.put(SparkExecutorCores, value) + + def totalExecutorCores = conf.get(SparkCoresMax) + def totalExecutorCores_= (value: String):Unit = conf.put(SparkCoresMax, value) + + def driverMemory = conf(SparkDriverMemory) + def driverMemory_= (value: String):Unit = conf.put(SparkDriverMemory, value) + + def driverExtraClassPath
[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
Github user tigerquoll commented on a diff in the pull request: https://github.com/apache/spark/pull/2516#discussion_r18148421 --- Diff: core/src/main/resources/org/apache/spark/deploy/spark-submit-defaults.prop --- @@ -0,0 +1,18 @@ + +spark.master = local[*] --- End diff -- Ok, will do --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
Github user tigerquoll commented on a diff in the pull request: https://github.com/apache/spark/pull/2516#discussion_r18148846 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala --- @@ -406,22 +412,173 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) { } } -object SparkSubmitArguments { - /** Load properties present in the given file. */ - def getPropertiesFromFile(file: File): Seq[(String, String)] = { -require(file.exists(), sProperties file $file does not exist) -require(file.isFile(), sProperties file $file is not a normal file) -val inputStream = new FileInputStream(file) +private[spark] object SparkSubmitArguments { + /** + * Resolves Configuration sources in order of highest to lowest + * 1. Each map passed in as additionalConfig from first to last + * 2. Environment variables (including legacy variable mappings) + * 3. System config variables (eg by using -Dspark.var.name) + * 4 SPARK_DEFAULT_CONF/spark-defaults.conf or SPARK_HOME/conf/spark-defaults.conf + * 5. hard coded defaults in class path at spark-submit-defaults.prop + * + * A property file specified by one of the means listed above gets read in and the properties are + * considered to be at the priority of the method that specified the files. + * A property specified in a property file will not override an existing + * config value at that same level + * + * @param additionalConfigs Seq of additional Map[ConfigName-ConfigValue] in order of highest + * priority to lowest this will have priority over internal sources + * @return Map[propName-propFile] containing values merged from all sources in order of priority + */ + def mergeSparkProperties(additionalConfigs: Seq [Map[String,String]]) = { +// Configuration read in from spark-submit-defaults.prop file found on the classpath +var hardCodedDefaultConfig: Option[Map[String,String]] = None --- End diff -- Ok, nuked the property file read in from the classpath and now using a default value map --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
Github user tigerquoll commented on a diff in the pull request: https://github.com/apache/spark/pull/2516#discussion_r18148885 --- Diff: core/src/test/scala/org/apache/spark/deploy/MergablePropertiesTest.scala --- @@ -0,0 +1,55 @@ +package org.apache.spark.deploy --- End diff -- added --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
Github user tigerquoll commented on a diff in the pull request: https://github.com/apache/spark/pull/2516#discussion_r18190759 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala --- @@ -406,22 +412,173 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) { } } -object SparkSubmitArguments { - /** Load properties present in the given file. */ - def getPropertiesFromFile(file: File): Seq[(String, String)] = { -require(file.exists(), sProperties file $file does not exist) -require(file.isFile(), sProperties file $file is not a normal file) -val inputStream = new FileInputStream(file) +private[spark] object SparkSubmitArguments { + /** + * Resolves Configuration sources in order of highest to lowest + * 1. Each map passed in as additionalConfig from first to last + * 2. Environment variables (including legacy variable mappings) + * 3. System config variables (eg by using -Dspark.var.name) + * 4 SPARK_DEFAULT_CONF/spark-defaults.conf or SPARK_HOME/conf/spark-defaults.conf + * 5. hard coded defaults in class path at spark-submit-defaults.prop + * + * A property file specified by one of the means listed above gets read in and the properties are + * considered to be at the priority of the method that specified the files. + * A property specified in a property file will not override an existing + * config value at that same level + * + * @param additionalConfigs Seq of additional Map[ConfigName-ConfigValue] in order of highest + * priority to lowest this will have priority over internal sources + * @return Map[propName-propFile] containing values merged from all sources in order of priority + */ + def mergeSparkProperties(additionalConfigs: Seq [Map[String,String]]) = { +// Configuration read in from spark-submit-defaults.prop file found on the classpath +var hardCodedDefaultConfig: Option[Map[String,String]] = None +var is: InputStream = null +var isr: Option[InputStreamReader] = None try { - val properties = new Properties() - properties.load(inputStream) - properties.stringPropertyNames().toSeq.map(k = (k, properties(k).trim)) -} catch { - case e: IOException = -val message = sFailed when loading Spark properties file $file -throw new SparkException(message, e) + is = Thread.currentThread().getContextClassLoader.getResourceAsStream(ClassPathSparkSubmitDefaults) + + // only open InputStreamReader if InputStream was successfully opened + isr = Option(is).map{is: InputStream = +new InputStreamReader(is, CharEncoding.UTF_8) + } + + hardCodedDefaultConfig = isr.map( defaultValueStream = + SparkSubmitArguments.getPropertyValuesFromStream(defaultValueStream)) } finally { - inputStream.close() + Option(is).foreach(_.close) + isr.foreach(_.close) } + +if (hardCodedDefaultConfig.isEmpty || (hardCodedDefaultConfig.get.size == 0)) { + throw new IllegalStateException(sDefault values not found at classpath $ClassPathSparkSubmitDefaults) +} + +// Configuration read in from defaults file if it exists +var sparkDefaultConfig = SparkSubmitArguments.getSparkDefaultFileConfig + +if (sparkDefaultConfig.isDefinedAt(SparkPropertiesFile)) { + SparkSubmitArguments.getPropertyValuesFromFile( + sparkDefaultConfig.get(SparkPropertiesFile).get) +} else { + Map.empty +} + +// Configuration from java system properties +val systemPropertyConfig = SparkSubmitArguments.getPropertyMap(System.getProperties) + +// Configuration variables from the environment +// support legacy variables +val environmentConfig = System.getenv().asScala + +val legacyEnvVars = Seq(MASTER-SparkMaster, DEPLOY_MODE-SparkDeployMode, + SPARK_DRIVER_MEMORY-SparkDriverMemory, SPARK_EXECUTOR_MEMORY-SparkExecutorMemory) + +// legacy variables act at the priority of a system property +val propsWithEnvVars : mutable.Map[String,String] = new mutable.HashMap() ++ systemPropertyConfig ++ legacyEnvVars + .map( {case(varName, propName) = (environmentConfig.get(varName), propName) }) + .filter( {case(varVariable, _) = varVariable.isDefined !varVariable.get.isEmpty} ) + .map{case(varVariable, propName) = (propName, varVariable.get)} + +val ConfigSources = additionalConfigs ++ Seq ( + environmentConfig, + propsWithEnvVars, + sparkDefaultConfig, + hardCodedDefaultConfig.get +) + +// Load properties file
[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
Github user tigerquoll commented on a diff in the pull request: https://github.com/apache/spark/pull/2516#discussion_r18191705 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala --- @@ -406,22 +412,173 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) { } } -object SparkSubmitArguments { - /** Load properties present in the given file. */ - def getPropertiesFromFile(file: File): Seq[(String, String)] = { -require(file.exists(), sProperties file $file does not exist) -require(file.isFile(), sProperties file $file is not a normal file) -val inputStream = new FileInputStream(file) +private[spark] object SparkSubmitArguments { + /** + * Resolves Configuration sources in order of highest to lowest + * 1. Each map passed in as additionalConfig from first to last + * 2. Environment variables (including legacy variable mappings) + * 3. System config variables (eg by using -Dspark.var.name) + * 4 SPARK_DEFAULT_CONF/spark-defaults.conf or SPARK_HOME/conf/spark-defaults.conf + * 5. hard coded defaults in class path at spark-submit-defaults.prop + * + * A property file specified by one of the means listed above gets read in and the properties are + * considered to be at the priority of the method that specified the files. + * A property specified in a property file will not override an existing + * config value at that same level + * + * @param additionalConfigs Seq of additional Map[ConfigName-ConfigValue] in order of highest + * priority to lowest this will have priority over internal sources + * @return Map[propName-propFile] containing values merged from all sources in order of priority + */ + def mergeSparkProperties(additionalConfigs: Seq [Map[String,String]]) = { +// Configuration read in from spark-submit-defaults.prop file found on the classpath +var hardCodedDefaultConfig: Option[Map[String,String]] = None +var is: InputStream = null +var isr: Option[InputStreamReader] = None try { - val properties = new Properties() - properties.load(inputStream) - properties.stringPropertyNames().toSeq.map(k = (k, properties(k).trim)) -} catch { - case e: IOException = -val message = sFailed when loading Spark properties file $file -throw new SparkException(message, e) + is = Thread.currentThread().getContextClassLoader.getResourceAsStream(ClassPathSparkSubmitDefaults) + + // only open InputStreamReader if InputStream was successfully opened + isr = Option(is).map{is: InputStream = +new InputStreamReader(is, CharEncoding.UTF_8) + } + + hardCodedDefaultConfig = isr.map( defaultValueStream = + SparkSubmitArguments.getPropertyValuesFromStream(defaultValueStream)) } finally { - inputStream.close() + Option(is).foreach(_.close) + isr.foreach(_.close) } + +if (hardCodedDefaultConfig.isEmpty || (hardCodedDefaultConfig.get.size == 0)) { + throw new IllegalStateException(sDefault values not found at classpath $ClassPathSparkSubmitDefaults) +} + +// Configuration read in from defaults file if it exists +var sparkDefaultConfig = SparkSubmitArguments.getSparkDefaultFileConfig + +if (sparkDefaultConfig.isDefinedAt(SparkPropertiesFile)) { + SparkSubmitArguments.getPropertyValuesFromFile( + sparkDefaultConfig.get(SparkPropertiesFile).get) +} else { + Map.empty +} + +// Configuration from java system properties +val systemPropertyConfig = SparkSubmitArguments.getPropertyMap(System.getProperties) + +// Configuration variables from the environment +// support legacy variables +val environmentConfig = System.getenv().asScala + +val legacyEnvVars = Seq(MASTER-SparkMaster, DEPLOY_MODE-SparkDeployMode, + SPARK_DRIVER_MEMORY-SparkDriverMemory, SPARK_EXECUTOR_MEMORY-SparkExecutorMemory) + +// legacy variables act at the priority of a system property +val propsWithEnvVars : mutable.Map[String,String] = new mutable.HashMap() ++ systemPropertyConfig ++ legacyEnvVars + .map( {case(varName, propName) = (environmentConfig.get(varName), propName) }) + .filter( {case(varVariable, _) = varVariable.isDefined !varVariable.get.isEmpty} ) + .map{case(varVariable, propName) = (propName, varVariable.get)} + +val ConfigSources = additionalConfigs ++ Seq ( + environmentConfig, + propsWithEnvVars, + sparkDefaultConfig, + hardCodedDefaultConfig.get +) + +// Load properties file
[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
Github user tigerquoll commented on a diff in the pull request: https://github.com/apache/spark/pull/2516#discussion_r18128789 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala --- @@ -406,22 +412,173 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) { } } -object SparkSubmitArguments { - /** Load properties present in the given file. */ - def getPropertiesFromFile(file: File): Seq[(String, String)] = { -require(file.exists(), sProperties file $file does not exist) -require(file.isFile(), sProperties file $file is not a normal file) -val inputStream = new FileInputStream(file) +private[spark] object SparkSubmitArguments { + /** + * Resolves Configuration sources in order of highest to lowest + * 1. Each map passed in as additionalConfig from first to last + * 2. Environment variables (including legacy variable mappings) + * 3. System config variables (eg by using -Dspark.var.name) + * 4 SPARK_DEFAULT_CONF/spark-defaults.conf or SPARK_HOME/conf/spark-defaults.conf + * 5. hard coded defaults in class path at spark-submit-defaults.prop + * + * A property file specified by one of the means listed above gets read in and the properties are + * considered to be at the priority of the method that specified the files. + * A property specified in a property file will not override an existing + * config value at that same level + * + * @param additionalConfigs Seq of additional Map[ConfigName-ConfigValue] in order of highest + * priority to lowest this will have priority over internal sources + * @return Map[propName-propFile] containing values merged from all sources in order of priority + */ + def mergeSparkProperties(additionalConfigs: Seq [Map[String,String]]) = { +// Configuration read in from spark-submit-defaults.prop file found on the classpath +var hardCodedDefaultConfig: Option[Map[String,String]] = None +var is: InputStream = null +var isr: Option[InputStreamReader] = None try { - val properties = new Properties() - properties.load(inputStream) - properties.stringPropertyNames().toSeq.map(k = (k, properties(k).trim)) -} catch { - case e: IOException = -val message = sFailed when loading Spark properties file $file -throw new SparkException(message, e) + is = Thread.currentThread().getContextClassLoader.getResourceAsStream(ClassPathSparkSubmitDefaults) + + // only open InputStreamReader if InputStream was successfully opened + isr = Option(is).map{is: InputStream = +new InputStreamReader(is, CharEncoding.UTF_8) + } + + hardCodedDefaultConfig = isr.map( defaultValueStream = + SparkSubmitArguments.getPropertyValuesFromStream(defaultValueStream)) } finally { - inputStream.close() + Option(is).foreach(_.close) + isr.foreach(_.close) } + +if (hardCodedDefaultConfig.isEmpty || (hardCodedDefaultConfig.get.size == 0)) { + throw new IllegalStateException(sDefault values not found at classpath $ClassPathSparkSubmitDefaults) +} + +// Configuration read in from defaults file if it exists +var sparkDefaultConfig = SparkSubmitArguments.getSparkDefaultFileConfig + +if (sparkDefaultConfig.isDefinedAt(SparkPropertiesFile)) { + SparkSubmitArguments.getPropertyValuesFromFile( + sparkDefaultConfig.get(SparkPropertiesFile).get) +} else { + Map.empty +} + +// Configuration from java system properties +val systemPropertyConfig = SparkSubmitArguments.getPropertyMap(System.getProperties) + +// Configuration variables from the environment +// support legacy variables +val environmentConfig = System.getenv().asScala + +val legacyEnvVars = Seq(MASTER-SparkMaster, DEPLOY_MODE-SparkDeployMode, + SPARK_DRIVER_MEMORY-SparkDriverMemory, SPARK_EXECUTOR_MEMORY-SparkExecutorMemory) + +// legacy variables act at the priority of a system property +val propsWithEnvVars : mutable.Map[String,String] = new mutable.HashMap() ++ systemPropertyConfig ++ legacyEnvVars + .map( {case(varName, propName) = (environmentConfig.get(varName), propName) }) + .filter( {case(varVariable, _) = varVariable.isDefined !varVariable.get.isEmpty} ) + .map{case(varVariable, propName) = (propName, varVariable.get)} + +val ConfigSources = additionalConfigs ++ Seq ( + environmentConfig, + propsWithEnvVars, + sparkDefaultConfig, + hardCodedDefaultConfig.get +) + +// Load properties file
[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
Github user tigerquoll commented on a diff in the pull request: https://github.com/apache/spark/pull/2516#discussion_r18128856 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala --- @@ -406,22 +412,173 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) { } } -object SparkSubmitArguments { - /** Load properties present in the given file. */ - def getPropertiesFromFile(file: File): Seq[(String, String)] = { -require(file.exists(), sProperties file $file does not exist) -require(file.isFile(), sProperties file $file is not a normal file) -val inputStream = new FileInputStream(file) +private[spark] object SparkSubmitArguments { + /** + * Resolves Configuration sources in order of highest to lowest + * 1. Each map passed in as additionalConfig from first to last + * 2. Environment variables (including legacy variable mappings) + * 3. System config variables (eg by using -Dspark.var.name) + * 4 SPARK_DEFAULT_CONF/spark-defaults.conf or SPARK_HOME/conf/spark-defaults.conf + * 5. hard coded defaults in class path at spark-submit-defaults.prop + * + * A property file specified by one of the means listed above gets read in and the properties are + * considered to be at the priority of the method that specified the files. + * A property specified in a property file will not override an existing + * config value at that same level + * + * @param additionalConfigs Seq of additional Map[ConfigName-ConfigValue] in order of highest + * priority to lowest this will have priority over internal sources + * @return Map[propName-propFile] containing values merged from all sources in order of priority + */ + def mergeSparkProperties(additionalConfigs: Seq [Map[String,String]]) = { +// Configuration read in from spark-submit-defaults.prop file found on the classpath +var hardCodedDefaultConfig: Option[Map[String,String]] = None +var is: InputStream = null +var isr: Option[InputStreamReader] = None try { - val properties = new Properties() - properties.load(inputStream) - properties.stringPropertyNames().toSeq.map(k = (k, properties(k).trim)) -} catch { - case e: IOException = -val message = sFailed when loading Spark properties file $file -throw new SparkException(message, e) + is = Thread.currentThread().getContextClassLoader.getResourceAsStream(ClassPathSparkSubmitDefaults) + + // only open InputStreamReader if InputStream was successfully opened + isr = Option(is).map{is: InputStream = +new InputStreamReader(is, CharEncoding.UTF_8) + } + + hardCodedDefaultConfig = isr.map( defaultValueStream = + SparkSubmitArguments.getPropertyValuesFromStream(defaultValueStream)) } finally { - inputStream.close() + Option(is).foreach(_.close) + isr.foreach(_.close) } + +if (hardCodedDefaultConfig.isEmpty || (hardCodedDefaultConfig.get.size == 0)) { + throw new IllegalStateException(sDefault values not found at classpath $ClassPathSparkSubmitDefaults) +} + +// Configuration read in from defaults file if it exists +var sparkDefaultConfig = SparkSubmitArguments.getSparkDefaultFileConfig + +if (sparkDefaultConfig.isDefinedAt(SparkPropertiesFile)) { + SparkSubmitArguments.getPropertyValuesFromFile( + sparkDefaultConfig.get(SparkPropertiesFile).get) +} else { + Map.empty +} + +// Configuration from java system properties +val systemPropertyConfig = SparkSubmitArguments.getPropertyMap(System.getProperties) + +// Configuration variables from the environment +// support legacy variables +val environmentConfig = System.getenv().asScala + +val legacyEnvVars = Seq(MASTER-SparkMaster, DEPLOY_MODE-SparkDeployMode, + SPARK_DRIVER_MEMORY-SparkDriverMemory, SPARK_EXECUTOR_MEMORY-SparkExecutorMemory) + +// legacy variables act at the priority of a system property +val propsWithEnvVars : mutable.Map[String,String] = new mutable.HashMap() ++ systemPropertyConfig ++ legacyEnvVars + .map( {case(varName, propName) = (environmentConfig.get(varName), propName) }) + .filter( {case(varVariable, _) = varVariable.isDefined !varVariable.get.isEmpty} ) + .map{case(varVariable, propName) = (propName, varVariable.get)} + +val ConfigSources = additionalConfigs ++ Seq ( + environmentConfig, + propsWithEnvVars, + sparkDefaultConfig, + hardCodedDefaultConfig.get +) + +// Load properties file
[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
Github user tigerquoll commented on a diff in the pull request: https://github.com/apache/spark/pull/2516#discussion_r18128941 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala --- @@ -406,22 +412,173 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) { } } -object SparkSubmitArguments { - /** Load properties present in the given file. */ - def getPropertiesFromFile(file: File): Seq[(String, String)] = { -require(file.exists(), sProperties file $file does not exist) -require(file.isFile(), sProperties file $file is not a normal file) -val inputStream = new FileInputStream(file) +private[spark] object SparkSubmitArguments { + /** + * Resolves Configuration sources in order of highest to lowest + * 1. Each map passed in as additionalConfig from first to last + * 2. Environment variables (including legacy variable mappings) + * 3. System config variables (eg by using -Dspark.var.name) + * 4 SPARK_DEFAULT_CONF/spark-defaults.conf or SPARK_HOME/conf/spark-defaults.conf + * 5. hard coded defaults in class path at spark-submit-defaults.prop + * + * A property file specified by one of the means listed above gets read in and the properties are + * considered to be at the priority of the method that specified the files. + * A property specified in a property file will not override an existing + * config value at that same level + * + * @param additionalConfigs Seq of additional Map[ConfigName-ConfigValue] in order of highest + * priority to lowest this will have priority over internal sources + * @return Map[propName-propFile] containing values merged from all sources in order of priority + */ + def mergeSparkProperties(additionalConfigs: Seq [Map[String,String]]) = { +// Configuration read in from spark-submit-defaults.prop file found on the classpath +var hardCodedDefaultConfig: Option[Map[String,String]] = None +var is: InputStream = null +var isr: Option[InputStreamReader] = None try { - val properties = new Properties() - properties.load(inputStream) - properties.stringPropertyNames().toSeq.map(k = (k, properties(k).trim)) -} catch { - case e: IOException = -val message = sFailed when loading Spark properties file $file -throw new SparkException(message, e) + is = Thread.currentThread().getContextClassLoader.getResourceAsStream(ClassPathSparkSubmitDefaults) + + // only open InputStreamReader if InputStream was successfully opened + isr = Option(is).map{is: InputStream = +new InputStreamReader(is, CharEncoding.UTF_8) + } + + hardCodedDefaultConfig = isr.map( defaultValueStream = + SparkSubmitArguments.getPropertyValuesFromStream(defaultValueStream)) } finally { - inputStream.close() + Option(is).foreach(_.close) + isr.foreach(_.close) } + +if (hardCodedDefaultConfig.isEmpty || (hardCodedDefaultConfig.get.size == 0)) { + throw new IllegalStateException(sDefault values not found at classpath $ClassPathSparkSubmitDefaults) +} + +// Configuration read in from defaults file if it exists +var sparkDefaultConfig = SparkSubmitArguments.getSparkDefaultFileConfig + +if (sparkDefaultConfig.isDefinedAt(SparkPropertiesFile)) { + SparkSubmitArguments.getPropertyValuesFromFile( + sparkDefaultConfig.get(SparkPropertiesFile).get) +} else { + Map.empty +} + +// Configuration from java system properties +val systemPropertyConfig = SparkSubmitArguments.getPropertyMap(System.getProperties) + +// Configuration variables from the environment +// support legacy variables +val environmentConfig = System.getenv().asScala + +val legacyEnvVars = Seq(MASTER-SparkMaster, DEPLOY_MODE-SparkDeployMode, + SPARK_DRIVER_MEMORY-SparkDriverMemory, SPARK_EXECUTOR_MEMORY-SparkExecutorMemory) + +// legacy variables act at the priority of a system property --- End diff -- Ok my way was even getting me confused. Lets use your suggested code and treat legacy env variables at the same priority as normal environment variables. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr
[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
Github user tigerquoll commented on a diff in the pull request: https://github.com/apache/spark/pull/2516#discussion_r18133582 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala --- @@ -17,155 +17,195 @@ package org.apache.spark.deploy -import java.io.{File, FileInputStream, IOException} -import java.util.Properties +import java.io.{InputStreamReader, File, FileInputStream, InputStream} import java.util.jar.JarFile +import java.util.Properties +import scala.collection._ +import scala.collection.JavaConverters._ import scala.collection.JavaConversions._ -import scala.collection.mutable.{ArrayBuffer, HashMap} +import org.apache.commons.lang3.CharEncoding -import org.apache.spark.SparkException +import org.apache.spark.deploy.ConfigConstants._ import org.apache.spark.util.Utils + + /** - * Parses and encapsulates arguments from the spark-submit script. - */ + * Pulls configuration information together in order of priority + * + * Entries in the conf Map will be filled in the following priority order + * 1. entries specified on the command line (except from --conf entries) + * 2. Entries specified on the command line with --conf + * 3. Environment variables (including legacy variable mappings) + * 4. System config variables (eg by using -Dspark.var.name) + * 5 SPARK_DEFAULT_CONF/spark-defaults.conf or SPARK_HOME/conf/spark-defaults.conf if either exist + * 6. hard coded defaults in class path at spark-submit-defaults.prop + * + * A property file specified by one of the means listed above gets read in and the properties are + * considered to be at the priority of the method that specified the files. A property specified in + * a property file will not override an existing config value at that same level +*/ private[spark] class SparkSubmitArguments(args: Seq[String]) { - var master: String = null - var deployMode: String = null - var executorMemory: String = null - var executorCores: String = null - var totalExecutorCores: String = null - var propertiesFile: String = null - var driverMemory: String = null - var driverExtraClassPath: String = null - var driverExtraLibraryPath: String = null - var driverExtraJavaOptions: String = null - var driverCores: String = null - var supervise: Boolean = false - var queue: String = null - var numExecutors: String = null - var files: String = null - var archives: String = null - var mainClass: String = null - var primaryResource: String = null - var name: String = null - var childArgs: ArrayBuffer[String] = new ArrayBuffer[String]() - var jars: String = null - var verbose: Boolean = false - var isPython: Boolean = false - var pyFiles: String = null - val sparkProperties: HashMap[String, String] = new HashMap[String, String]() - - /** Default properties present in the currently defined defaults file. */ - lazy val defaultSparkProperties: HashMap[String, String] = { -val defaultProperties = new HashMap[String, String]() -if (verbose) SparkSubmit.printStream.println(sUsing properties file: $propertiesFile) -Option(propertiesFile).foreach { filename = - val file = new File(filename) - SparkSubmitArguments.getPropertiesFromFile(file).foreach { case (k, v) = -if (k.startsWith(spark)) { - defaultProperties(k) = v - if (verbose) SparkSubmit.printStream.println(sAdding default property: $k=$v) -} else { - SparkSubmit.printWarning(sIgnoring non-spark config property: $k=$v) -} - } -} -defaultProperties - } + /** + * Stores all configuration items except for child arguments, + * referenced by the constants defined in ConfigConstants.scala + */ + val conf = new mutable.HashMap[String, String]() + + def master = conf(SparkMaster) + def master_= (value: String):Unit = conf.put(SparkMaster, value) + + def deployMode = conf(SparkDeployMode) + def deployMode_= (value: String):Unit = conf.put(SparkDeployMode, value) + + def executorMemory = conf(SparkExecutorMemory) + def executorMemory_= (value: String):Unit = conf.put(SparkExecutorMemory, value) + + def executorCores = conf(SparkExecutorCores) + def executorCores_= (value: String):Unit = conf.put(SparkExecutorCores, value) + + def totalExecutorCores = conf.get(SparkCoresMax) + def totalExecutorCores_= (value: String):Unit = conf.put(SparkCoresMax, value) + + def driverMemory = conf(SparkDriverMemory) + def driverMemory_= (value: String):Unit = conf.put(SparkDriverMemory, value) + + def driverExtraClassPath
[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
Github user tigerquoll commented on a diff in the pull request: https://github.com/apache/spark/pull/2516#discussion_r18133601 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala --- @@ -406,22 +412,173 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) { } } -object SparkSubmitArguments { - /** Load properties present in the given file. */ - def getPropertiesFromFile(file: File): Seq[(String, String)] = { -require(file.exists(), sProperties file $file does not exist) -require(file.isFile(), sProperties file $file is not a normal file) -val inputStream = new FileInputStream(file) +private[spark] object SparkSubmitArguments { + /** + * Resolves Configuration sources in order of highest to lowest + * 1. Each map passed in as additionalConfig from first to last + * 2. Environment variables (including legacy variable mappings) + * 3. System config variables (eg by using -Dspark.var.name) + * 4 SPARK_DEFAULT_CONF/spark-defaults.conf or SPARK_HOME/conf/spark-defaults.conf + * 5. hard coded defaults in class path at spark-submit-defaults.prop + * + * A property file specified by one of the means listed above gets read in and the properties are + * considered to be at the priority of the method that specified the files. + * A property specified in a property file will not override an existing + * config value at that same level + * + * @param additionalConfigs Seq of additional Map[ConfigName-ConfigValue] in order of highest + * priority to lowest this will have priority over internal sources + * @return Map[propName-propFile] containing values merged from all sources in order of priority + */ + def mergeSparkProperties(additionalConfigs: Seq [Map[String,String]]) = { +// Configuration read in from spark-submit-defaults.prop file found on the classpath +var hardCodedDefaultConfig: Option[Map[String,String]] = None +var is: InputStream = null +var isr: Option[InputStreamReader] = None try { - val properties = new Properties() - properties.load(inputStream) - properties.stringPropertyNames().toSeq.map(k = (k, properties(k).trim)) -} catch { - case e: IOException = -val message = sFailed when loading Spark properties file $file -throw new SparkException(message, e) + is = Thread.currentThread().getContextClassLoader.getResourceAsStream(ClassPathSparkSubmitDefaults) + + // only open InputStreamReader if InputStream was successfully opened + isr = Option(is).map{is: InputStream = +new InputStreamReader(is, CharEncoding.UTF_8) + } + + hardCodedDefaultConfig = isr.map( defaultValueStream = + SparkSubmitArguments.getPropertyValuesFromStream(defaultValueStream)) } finally { - inputStream.close() + Option(is).foreach(_.close) + isr.foreach(_.close) } + +if (hardCodedDefaultConfig.isEmpty || (hardCodedDefaultConfig.get.size == 0)) { + throw new IllegalStateException(sDefault values not found at classpath $ClassPathSparkSubmitDefaults) +} + +// Configuration read in from defaults file if it exists +var sparkDefaultConfig = SparkSubmitArguments.getSparkDefaultFileConfig + +if (sparkDefaultConfig.isDefinedAt(SparkPropertiesFile)) { + SparkSubmitArguments.getPropertyValuesFromFile( + sparkDefaultConfig.get(SparkPropertiesFile).get) +} else { + Map.empty +} + +// Configuration from java system properties +val systemPropertyConfig = SparkSubmitArguments.getPropertyMap(System.getProperties) + +// Configuration variables from the environment +// support legacy variables +val environmentConfig = System.getenv().asScala + +val legacyEnvVars = Seq(MASTER-SparkMaster, DEPLOY_MODE-SparkDeployMode, + SPARK_DRIVER_MEMORY-SparkDriverMemory, SPARK_EXECUTOR_MEMORY-SparkExecutorMemory) + +// legacy variables act at the priority of a system property +val propsWithEnvVars : mutable.Map[String,String] = new mutable.HashMap() ++ systemPropertyConfig ++ legacyEnvVars + .map( {case(varName, propName) = (environmentConfig.get(varName), propName) }) + .filter( {case(varVariable, _) = varVariable.isDefined !varVariable.get.isEmpty} ) + .map{case(varVariable, propName) = (propName, varVariable.get)} + +val ConfigSources = additionalConfigs ++ Seq ( --- End diff -- changed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have
[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
Github user tigerquoll commented on a diff in the pull request: https://github.com/apache/spark/pull/2516#discussion_r18134461 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala --- @@ -406,22 +412,173 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) { } } -object SparkSubmitArguments { - /** Load properties present in the given file. */ - def getPropertiesFromFile(file: File): Seq[(String, String)] = { -require(file.exists(), sProperties file $file does not exist) -require(file.isFile(), sProperties file $file is not a normal file) -val inputStream = new FileInputStream(file) +private[spark] object SparkSubmitArguments { + /** + * Resolves Configuration sources in order of highest to lowest + * 1. Each map passed in as additionalConfig from first to last + * 2. Environment variables (including legacy variable mappings) + * 3. System config variables (eg by using -Dspark.var.name) + * 4 SPARK_DEFAULT_CONF/spark-defaults.conf or SPARK_HOME/conf/spark-defaults.conf + * 5. hard coded defaults in class path at spark-submit-defaults.prop + * + * A property file specified by one of the means listed above gets read in and the properties are + * considered to be at the priority of the method that specified the files. + * A property specified in a property file will not override an existing + * config value at that same level + * + * @param additionalConfigs Seq of additional Map[ConfigName-ConfigValue] in order of highest + * priority to lowest this will have priority over internal sources + * @return Map[propName-propFile] containing values merged from all sources in order of priority + */ + def mergeSparkProperties(additionalConfigs: Seq [Map[String,String]]) = { +// Configuration read in from spark-submit-defaults.prop file found on the classpath +var hardCodedDefaultConfig: Option[Map[String,String]] = None +var is: InputStream = null +var isr: Option[InputStreamReader] = None try { - val properties = new Properties() - properties.load(inputStream) - properties.stringPropertyNames().toSeq.map(k = (k, properties(k).trim)) -} catch { - case e: IOException = -val message = sFailed when loading Spark properties file $file -throw new SparkException(message, e) + is = Thread.currentThread().getContextClassLoader.getResourceAsStream(ClassPathSparkSubmitDefaults) + + // only open InputStreamReader if InputStream was successfully opened + isr = Option(is).map{is: InputStream = +new InputStreamReader(is, CharEncoding.UTF_8) + } + + hardCodedDefaultConfig = isr.map( defaultValueStream = + SparkSubmitArguments.getPropertyValuesFromStream(defaultValueStream)) } finally { - inputStream.close() + Option(is).foreach(_.close) + isr.foreach(_.close) } + +if (hardCodedDefaultConfig.isEmpty || (hardCodedDefaultConfig.get.size == 0)) { + throw new IllegalStateException(sDefault values not found at classpath $ClassPathSparkSubmitDefaults) +} + +// Configuration read in from defaults file if it exists +var sparkDefaultConfig = SparkSubmitArguments.getSparkDefaultFileConfig + +if (sparkDefaultConfig.isDefinedAt(SparkPropertiesFile)) { + SparkSubmitArguments.getPropertyValuesFromFile( + sparkDefaultConfig.get(SparkPropertiesFile).get) +} else { + Map.empty +} + +// Configuration from java system properties +val systemPropertyConfig = SparkSubmitArguments.getPropertyMap(System.getProperties) + +// Configuration variables from the environment +// support legacy variables +val environmentConfig = System.getenv().asScala + +val legacyEnvVars = Seq(MASTER-SparkMaster, DEPLOY_MODE-SparkDeployMode, + SPARK_DRIVER_MEMORY-SparkDriverMemory, SPARK_EXECUTOR_MEMORY-SparkExecutorMemory) + +// legacy variables act at the priority of a system property +val propsWithEnvVars : mutable.Map[String,String] = new mutable.HashMap() ++ systemPropertyConfig ++ legacyEnvVars + .map( {case(varName, propName) = (environmentConfig.get(varName), propName) }) + .filter( {case(varVariable, _) = varVariable.isDefined !varVariable.get.isEmpty} ) + .map{case(varVariable, propName) = (propName, varVariable.get)} + +val ConfigSources = additionalConfigs ++ Seq ( + environmentConfig, + propsWithEnvVars, + sparkDefaultConfig, + hardCodedDefaultConfig.get +) + +// Load properties file
[GitHub] spark pull request: SPARK-CORE [SPARK-3651] Group common CoarseGra...
Github user tigerquoll commented on a diff in the pull request: https://github.com/apache/spark/pull/2533#discussion_r18126499 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala --- @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster + +import akka.actor.{Address, ActorRef} + +private[cluster] class ExecutorData( --- End diff -- Done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-CORE [SPARK-3651] Group common CoarseGra...
Github user tigerquoll commented on a diff in the pull request: https://github.com/apache/spark/pull/2533#discussion_r18126510 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala --- @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster + +import akka.actor.{Address, ActorRef} + +private[cluster] class ExecutorData( + var executorActor: ActorRef, + var executorAddress: Address, + var executorHost: String , + var freeCores: Int, + var totalCores: Int +) {} --- End diff -- done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-CORE [SPARK-3651] Group common CoarseGra...
Github user tigerquoll commented on a diff in the pull request: https://github.com/apache/spark/pull/2533#discussion_r18126545 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -104,13 +96,15 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A case StatusUpdate(executorId, taskId, state, data) = scheduler.statusUpdate(taskId, state, data.value) if (TaskState.isFinished(state)) { - if (executorActor.contains(executorId)) { -freeCores(executorId) += scheduler.CPUS_PER_TASK -makeOffers(executorId) - } else { -// Ignoring the update since we don't know about the executor. -val msg = Ignored task status update (%d state %s) from unknown executor %s with ID %s -logWarning(msg.format(taskId, state, sender, executorId)) + executorDataMap.get(executorId) match { +case Some(executorInfo) = + executorInfo.freeCores += scheduler.CPUS_PER_TASK + makeOffers(executorId) +case None = + // Ignoring the update since we don't know about the executor. + val msg = Ignored task status update (%d state %s) + --- End diff -- Done and replaced format with a 's' interpolated string. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-CORE [SPARK-3651] Group common CoarseGra...
Github user tigerquoll commented on a diff in the pull request: https://github.com/apache/spark/pull/2533#discussion_r18126658 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala --- @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster + +import akka.actor.{Address, ActorRef} + +private[cluster] class ExecutorData( + var executorActor: ActorRef, --- End diff -- Good point - All but freeCores changed to vals --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-CORE [SPARK-3651] Group common CoarseGra...
Github user tigerquoll commented on a diff in the pull request: https://github.com/apache/spark/pull/2533#discussion_r18126663 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -126,8 +120,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A case StopExecutors = logInfo(Asking each executor to shut down) -for (executor - executorActor.values) { - executor ! StopExecutor +for ((_,executorData) - executorDataMap) { --- End diff -- Done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-CORE [SPARK-3651] Group common CoarseGra...
Github user tigerquoll commented on a diff in the pull request: https://github.com/apache/spark/pull/2533#discussion_r18126665 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -85,16 +79,14 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A def receiveWithLogging = { case RegisterExecutor(executorId, hostPort, cores) = Utils.checkHostPort(hostPort, Host port expected + hostPort) -if (executorActor.contains(executorId)) { +if (executorDataMap.contains(executorId)) { sender ! RegisterExecutorFailed(Duplicate executor ID: + executorId) } else { logInfo(Registered executor: + sender + with ID + executorId) sender ! RegisteredExecutor - executorActor(executorId) = sender - executorHost(executorId) = Utils.parseHostPort(hostPort)._1 - totalCores(executorId) = cores - freeCores(executorId) = cores - executorAddress(executorId) = sender.path.address + executorDataMap.put(executorId, new ExecutorData(sender, sender.path.address, --- End diff -- Done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-CORE [SPARK-3651] Group common CoarseGra...
Github user tigerquoll commented on a diff in the pull request: https://github.com/apache/spark/pull/2533#discussion_r18126667 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -297,6 +291,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A } } + --- End diff -- removed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-CORE [SPARK-3651] Group common CoarseGra...
Github user tigerquoll commented on a diff in the pull request: https://github.com/apache/spark/pull/2533#discussion_r18126669 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -179,25 +176,22 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A } } else { - freeCores(task.executorId) -= scheduler.CPUS_PER_TASK - executorActor(task.executorId) ! LaunchTask(new SerializableBuffer(serializedTask)) + val executorInfo = executorDataMap(task.executorId) + executorInfo.freeCores -= scheduler.CPUS_PER_TASK + executorInfo.executorActor ! LaunchTask(new SerializableBuffer(serializedTask)) } } } // Remove a disconnected slave from the cluster def removeExecutor(executorId: String, reason: String) { - if (executorActor.contains(executorId)) { -logInfo(Executor + executorId + disconnected, so removing it) -val numCores = totalCores(executorId) -executorActor -= executorId -executorHost -= executorId -addressToExecutorId -= executorAddress(executorId) -executorAddress -= executorId -totalCores -= executorId -freeCores -= executorId -totalCoreCount.addAndGet(-numCores) -scheduler.executorLost(executorId, SlaveLost(reason)) + executorDataMap.get(executorId) match { +case Some(executorInfo) = + val numCores = executorInfo.totalCores --- End diff -- expression inlined --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-CORE [SPARK-3651] Group common CoarseGra...
Github user tigerquoll commented on a diff in the pull request: https://github.com/apache/spark/pull/2533#discussion_r18126677 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -149,13 +144,15 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A // Make fake resource offers on all executors def makeOffers() { launchTasks(scheduler.resourceOffers( -executorHost.toArray.map {case (id, host) = new WorkerOffer(id, host, freeCores(id))})) +executorDataMap.map{ case(id, executorData) = --- End diff -- done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-CORE [SPARK-3651] Group common CoarseGra...
Github user tigerquoll commented on a diff in the pull request: https://github.com/apache/spark/pull/2533#discussion_r18126678 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -149,13 +144,15 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A // Make fake resource offers on all executors def makeOffers() { launchTasks(scheduler.resourceOffers( -executorHost.toArray.map {case (id, host) = new WorkerOffer(id, host, freeCores(id))})) +executorDataMap.map{ case(id, executorData) = + new WorkerOffer( id, executorData.executorHost, executorData.freeCores)}.toSeq)) --- End diff -- done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
Github user tigerquoll commented on the pull request: https://github.com/apache/spark/pull/2516#issuecomment-56777816 vanzin - great feedback. thanks for the effort of going through the code. I've implemented all the requested changes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-CORE [SPARK-3651] Group common CoarseGra...
GitHub user tigerquoll opened a pull request: https://github.com/apache/spark/pull/2533 SPARK-CORE [SPARK-3651] Group common CoarseGrainedSchedulerBackend variables together from [SPARK-3651] In CoarseGrainedSchedulerBackend, we have: private val executorActor = new HashMap[String, ActorRef] private val executorAddress = new HashMap[String, Address] private val executorHost = new HashMap[String, String] private val freeCores = new HashMap[String, Int] private val totalCores = new HashMap[String, Int] We only ever put / remove stuff from these maps together. It would simplify the code if we consolidate these all into one map as we have done in JobProgressListener in https://issues.apache.org/jira/browse/SPARK-2299. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tigerquoll/spark-tigerquoll SPARK-3651 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/2533.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2533 commit 7d671cf230bdad22f42f336174e8e0a8f7bc267b Author: Dale tigerqu...@outlook.com Date: 2014-09-25T10:46:30Z [SPARK-3651] Grouped variables under a ExecutorDataObject, and reference them via a map entry as they are all retrieved under the same key --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-CORE [SPARK-3651] Group common CoarseGra...
Github user tigerquoll commented on a diff in the pull request: https://github.com/apache/spark/pull/2533#discussion_r18065148 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -62,15 +62,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A val createTime = System.currentTimeMillis() class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor with ActorLogReceive { - override protected def log = CoarseGrainedSchedulerBackend.this.log - -private val executorActor = new HashMap[String, ActorRef] -private val executorAddress = new HashMap[String, Address] -private val executorHost = new HashMap[String, String] -private val freeCores = new HashMap[String, Int] -private val totalCores = new HashMap[String, Int] private val addressToExecutorId = new HashMap[Address, String] +private val executorData = new HashMap[String, ExecutorData] --- End diff -- changed to executorDataMap = new HashMap[String, ExecutorData] --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-CORE [SPARK-3651] Group common CoarseGra...
Github user tigerquoll commented on a diff in the pull request: https://github.com/apache/spark/pull/2533#discussion_r18065365 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -85,16 +79,18 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A def receiveWithLogging = { case RegisterExecutor(executorId, hostPort, cores) = Utils.checkHostPort(hostPort, Host port expected + hostPort) -if (executorActor.contains(executorId)) { +if (executorData.contains(executorId)) { sender ! RegisterExecutorFailed(Duplicate executor ID: + executorId) } else { logInfo(Registered executor: + sender + with ID + executorId) sender ! RegisteredExecutor - executorActor(executorId) = sender - executorHost(executorId) = Utils.parseHostPort(hostPort)._1 - totalCores(executorId) = cores - freeCores(executorId) = cores - executorAddress(executorId) = sender.path.address + executorData.put(executorId, new ExecutorData( --- End diff -- removed parameter names and re-ordered parameters --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-CORE [SPARK-3651] Group common CoarseGra...
Github user tigerquoll commented on a diff in the pull request: https://github.com/apache/spark/pull/2533#discussion_r18065400 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -104,13 +100,15 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A case StatusUpdate(executorId, taskId, state, data) = scheduler.statusUpdate(taskId, state, data.value) if (TaskState.isFinished(state)) { - if (executorActor.contains(executorId)) { -freeCores(executorId) += scheduler.CPUS_PER_TASK -makeOffers(executorId) - } else { -// Ignoring the update since we don't know about the executor. -val msg = Ignored task status update (%d state %s) from unknown executor %s with ID %s -logWarning(msg.format(taskId, state, sender, executorId)) + executorData.get(executorId) match { +case Some(executorInfo) = --- End diff -- The map is now called ExecutorDataMap --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-CORE [SPARK-3651] Group common CoarseGra...
Github user tigerquoll commented on a diff in the pull request: https://github.com/apache/spark/pull/2533#discussion_r18065662 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -126,8 +124,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A case StopExecutors = logInfo(Asking each executor to shut down) -for (executor - executorActor.values) { - executor ! StopExecutor +executorData.foreach { case(k,v) = --- End diff -- done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-CORE [SPARK-3651] Group common CoarseGra...
Github user tigerquoll commented on a diff in the pull request: https://github.com/apache/spark/pull/2533#discussion_r18065891 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -149,13 +147,14 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A // Make fake resource offers on all executors def makeOffers() { launchTasks(scheduler.resourceOffers( -executorHost.toArray.map {case (id, host) = new WorkerOffer(id, host, freeCores(id))})) +executorData.map{ case(k,v) = new WorkerOffer( k, v.executorHost, v.freeCores)}.toSeq)) --- End diff -- done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-CORE [SPARK-3651] Group common CoarseGra...
Github user tigerquoll commented on a diff in the pull request: https://github.com/apache/spark/pull/2533#discussion_r18066183 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -179,25 +178,22 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A } } else { - freeCores(task.executorId) -= scheduler.CPUS_PER_TASK - executorActor(task.executorId) ! LaunchTask(new SerializableBuffer(serializedTask)) + val executorInfo = executorData(task.executorId) + executorInfo.freeCores -= scheduler.CPUS_PER_TASK + executorInfo.executorActor ! LaunchTask(new SerializableBuffer(serializedTask)) } } } // Remove a disconnected slave from the cluster def removeExecutor(executorId: String, reason: String) { - if (executorActor.contains(executorId)) { -logInfo(Executor + executorId + disconnected, so removing it) -val numCores = totalCores(executorId) -executorActor -= executorId -executorHost -= executorId -addressToExecutorId -= executorAddress(executorId) -executorAddress -= executorId -totalCores -= executorId -freeCores -= executorId -totalCoreCount.addAndGet(-numCores) -scheduler.executorLost(executorId, SlaveLost(reason)) + executorData.get(executorId) match { --- End diff -- Pattern matching on an option is a clear and well understood pattern. please let clarity rule over conciseness in this case. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-CORE [SPARK-3651] Group common CoarseGra...
Github user tigerquoll commented on a diff in the pull request: https://github.com/apache/spark/pull/2533#discussion_r18066186 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -179,25 +178,22 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A } } else { - freeCores(task.executorId) -= scheduler.CPUS_PER_TASK - executorActor(task.executorId) ! LaunchTask(new SerializableBuffer(serializedTask)) + val executorInfo = executorData(task.executorId) + executorInfo.freeCores -= scheduler.CPUS_PER_TASK + executorInfo.executorActor ! LaunchTask(new SerializableBuffer(serializedTask)) } } } // Remove a disconnected slave from the cluster def removeExecutor(executorId: String, reason: String) { - if (executorActor.contains(executorId)) { -logInfo(Executor + executorId + disconnected, so removing it) -val numCores = totalCores(executorId) -executorActor -= executorId -executorHost -= executorId -addressToExecutorId -= executorAddress(executorId) -executorAddress -= executorId -totalCores -= executorId -freeCores -= executorId -totalCoreCount.addAndGet(-numCores) -scheduler.executorLost(executorId, SlaveLost(reason)) + executorData.get(executorId) match { +case Some(executorInfo) = + val numCores = executorInfo.totalCores + executorData.-=(executorId) --- End diff -- typo fixed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
Github user tigerquoll commented on a diff in the pull request: https://github.com/apache/spark/pull/2516#discussion_r18066533 --- Diff: core/src/main/resources/org/apache/spark/deploy/spark-submit-defaults.prop --- @@ -0,0 +1,18 @@ + +spark.master = local[*] --- End diff -- It could, but if this PR gets accepted I'm thinking of extending the concept to other configuration properties as well, many of which have default values sitting buried in other parts of the code --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
Github user tigerquoll commented on a diff in the pull request: https://github.com/apache/spark/pull/2516#discussion_r18066741 --- Diff: core/src/main/scala/org/apache/spark/SparkConf.scala --- @@ -330,13 +330,13 @@ private[spark] object SparkConf { */ def isExecutorStartupConf(name: String): Boolean = { isAkkaConf(name) || -name.startsWith(spark.akka) || -name.startsWith(spark.auth) || -isSparkPortConf(name) + name.startsWith(spark.akka) || + name.startsWith(spark.auth) || + isSparkPortConf(name) } /** * Return whether the given config is a Spark port config. */ def isSparkPortConf(name: String): Boolean = name.startsWith(spark.) name.endsWith(.port) -} +} --- End diff -- reverted the file --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
Github user tigerquoll commented on a diff in the pull request: https://github.com/apache/spark/pull/2516#discussion_r18067374 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala --- @@ -406,22 +413,166 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) { } } + object SparkSubmitArguments { - /** Load properties present in the given file. */ - def getPropertiesFromFile(file: File): Seq[(String, String)] = { -require(file.exists(), sProperties file $file does not exist) -require(file.isFile(), sProperties file $file is not a normal file) -val inputStream = new FileInputStream(file) -try { - val properties = new Properties() - properties.load(inputStream) - properties.stringPropertyNames().toSeq.map(k = (k, properties(k).trim)) -} catch { - case e: IOException = -val message = sFailed when loading Spark properties file $file -throw new SparkException(message, e) -} finally { - inputStream.close() + /** + * Resolves Configuration sources in order of highest to lowest + * 1. Each map passed in as additionalConfig from first to last + * 2. Environment variables (including legacy variable mappings) + * 3. System config variables (eg by using -Dspark.var.name) + * 4 SPARK_DEFAULT_CONF/spark-defaults.conf or SPARK_HOME/conf/spark-defaults.conf + * 5. hard coded defaults in class path at spark-submit-defaults.prop + * + * A property file specified by one of the means listed above gets read in and the properties are + * considered to be at the priority of the method that specified the files. + * A property specified in a property file will not override an existing + * config value at that same level + * + * @param additionalConfigs additional Map[ConfigName-ConfigValue] in order of highest + * priority to lowest + * @return Map[propName-propFile] containing values merged from all sources in order of priority + */ + def mergeSparkProperties(additionalConfigs: Vector[Map[String,String]]) = { + +// Configuration read in from spark-submit-defaults.prop file found on the classpath +val is = Option(Thread.currentThread().getContextClassLoader() + .getResourceAsStream(SparkSubmitDefaults)) + +val hardCodedDefaultConfig = is.flatMap{x = + Some(SparkSubmitArguments.getPropertyValuesFromStream(x))} + +if (hardCodedDefaultConfig.isEmpty || (hardCodedDefaultConfig.get.size == 0)) { + throw new IllegalStateException(sDefault values not found at classpath $SparkSubmitDefaults) +} + +// Configuration read in from defaults file if it exists +var sparkDefaultConfig = SparkSubmitArguments.getSparkDefaultFileConfig + +if (sparkDefaultConfig.isDefinedAt(SparkPropertiesFile)) +{ +SparkSubmitArguments.getPropertyValuesFromFile( + sparkDefaultConfig.get(SparkPropertiesFile).get) +} else { + Map.empty +} + +// Configuration from java system properties +val systemPropertyConfig = SparkSubmitArguments.getPropertyMap(System.getProperties) + +// Configuration variables from the environment +// support legacy variables +val environmentConfig = System.getenv().asScala + +val legacyEnvVars = List(MASTER-SparkMaster, DEPLOY_MODE-SparkDeployMode, + SPARK_DRIVER_MEMORY-SparkDriverMemory, SPARK_EXECUTOR_MEMORY-SparkExecutorMemory) + + +// legacy variables act at the priority of a system property +systemPropertyConfig ++ legacyEnvVars + .map( {case(varName, propName) = (environmentConfig.get(varName), propName) }) + .filter( {case(varVariable, _) = varVariable.isDefined !varVariable.get.isEmpty} ) + .map{case(varVariable, propName) = (propName, varVariable.get)} + +val ConfigSources = additionalConfigs ++ Vector ( + environmentConfig, + systemPropertyConfig, + sparkDefaultConfig, + hardCodedDefaultConfig.get +) + +// Load properties file at priority level of source that specified the property file +// loaded property file configs will not override existing configs at the priority +// level the property file was specified at +val processedConfigSource = ConfigSources + .map( configMap = getFileBasedPropertiesIfSpecified(configMap) ++ configMap) + +val test = MergedPropertyMap.mergePropertyMaps(processedConfigSource) + +test + } + + /** + * Returns a map of config values from a property file if + * the passed configMap has a SparkPropertiesFile defined pointing
[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
Github user tigerquoll commented on a diff in the pull request: https://github.com/apache/spark/pull/2516#discussion_r18067408 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala --- @@ -57,6 +57,10 @@ object SparkSubmit { private val CLASS_NOT_FOUND_EXIT_STATUS = 101 // Exposed for testing + // testing currently disabled exitFn() from working, so we need to stop execution --- End diff -- Done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
Github user tigerquoll commented on a diff in the pull request: https://github.com/apache/spark/pull/2516#discussion_r18067608 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala --- @@ -288,11 +291,11 @@ object SparkSubmit { } private def launch( - childArgs: ArrayBuffer[String], - childClasspath: ArrayBuffer[String], - sysProps: Map[String, String], - childMainClass: String, - verbose: Boolean = false) { + childArgs: mutable.ArrayBuffer[String], --- End diff -- Double checked Intellij's Scala plugin is set to 2 space tabs. I've deleted the spaces and re-indented - lets see if that solves it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
Github user tigerquoll commented on a diff in the pull request: https://github.com/apache/spark/pull/2516#discussion_r18067705 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala --- @@ -17,155 +17,195 @@ package org.apache.spark.deploy -import java.io.{File, FileInputStream, IOException} -import java.util.Properties +import java.io.{InputStreamReader, File, FileInputStream, InputStream} import java.util.jar.JarFile +import java.util.Properties +import scala.collection._ +import scala.collection.JavaConverters._ import scala.collection.JavaConversions._ -import scala.collection.mutable.{ArrayBuffer, HashMap} +import org.apache.commons.lang3.CharEncoding -import org.apache.spark.SparkException +import org.apache.spark.deploy.ConfigConstants._ import org.apache.spark.util.Utils + + /** - * Parses and encapsulates arguments from the spark-submit script. - */ + * Pulls configuration information together in order of priority + * + * Entries in the conf Map will be filled in the following priority order + * 1. entries specified on the command line (except from --conf entries) + * 2. Entries specified on the command line with --conf + * 3. Environment variables (including legacy variable mappings) + * 4. System config variables (eg by using -Dspark.var.name) + * 5 SPARK_DEFAULT_CONF/spark-defaults.conf or SPARK_HOME/conf/spark-defaults.conf if either exist + * 6. hard coded defaults in class path at spark-submit-defaults.prop + * + * A property file specified by one of the means listed above gets read in and the properties are + * considered to be at the priority of the method that specified the files. A property specified in + * a property file will not override an existing config value at that same level +*/ private[spark] class SparkSubmitArguments(args: Seq[String]) { - var master: String = null - var deployMode: String = null - var executorMemory: String = null - var executorCores: String = null - var totalExecutorCores: String = null - var propertiesFile: String = null - var driverMemory: String = null - var driverExtraClassPath: String = null - var driverExtraLibraryPath: String = null - var driverExtraJavaOptions: String = null - var driverCores: String = null - var supervise: Boolean = false - var queue: String = null - var numExecutors: String = null - var files: String = null - var archives: String = null - var mainClass: String = null - var primaryResource: String = null - var name: String = null - var childArgs: ArrayBuffer[String] = new ArrayBuffer[String]() - var jars: String = null - var verbose: Boolean = false - var isPython: Boolean = false - var pyFiles: String = null - val sparkProperties: HashMap[String, String] = new HashMap[String, String]() - - /** Default properties present in the currently defined defaults file. */ - lazy val defaultSparkProperties: HashMap[String, String] = { -val defaultProperties = new HashMap[String, String]() -if (verbose) SparkSubmit.printStream.println(sUsing properties file: $propertiesFile) -Option(propertiesFile).foreach { filename = - val file = new File(filename) - SparkSubmitArguments.getPropertiesFromFile(file).foreach { case (k, v) = -if (k.startsWith(spark)) { - defaultProperties(k) = v - if (verbose) SparkSubmit.printStream.println(sAdding default property: $k=$v) -} else { - SparkSubmit.printWarning(sIgnoring non-spark config property: $k=$v) -} - } -} -defaultProperties - } + /** + * Stores all configuration items except for child arguments, + * referenced by the constants defined in ConfigConstants.scala + */ + val conf = new mutable.HashMap[String, String]() + + def master = conf(SparkMaster) + def master_= (value: String):Unit = conf.put(SparkMaster, value) + + def deployMode = conf(SparkDeployMode) + def deployMode_= (value: String):Unit = conf.put(SparkDeployMode, value) + + def executorMemory = conf(SparkExecutorMemory) + def executorMemory_= (value: String):Unit = conf.put(SparkExecutorMemory, value) + + def executorCores = conf(SparkExecutorCores) + def executorCores_= (value: String):Unit = conf.put(SparkExecutorCores, value) + + def totalExecutorCores = conf.get(SparkCoresMax) + def totalExecutorCores_= (value: String):Unit = conf.put(SparkCoresMax, value) + + def driverMemory = conf(SparkDriverMemory) + def driverMemory_= (value: String):Unit = conf.put(SparkDriverMemory, value) + + def driverExtraClassPath
[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
Github user tigerquoll commented on a diff in the pull request: https://github.com/apache/spark/pull/2516#discussion_r18067718 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala --- @@ -17,155 +17,195 @@ package org.apache.spark.deploy -import java.io.{File, FileInputStream, IOException} -import java.util.Properties +import java.io.{InputStreamReader, File, FileInputStream, InputStream} import java.util.jar.JarFile +import java.util.Properties +import scala.collection._ +import scala.collection.JavaConverters._ import scala.collection.JavaConversions._ -import scala.collection.mutable.{ArrayBuffer, HashMap} +import org.apache.commons.lang3.CharEncoding -import org.apache.spark.SparkException +import org.apache.spark.deploy.ConfigConstants._ import org.apache.spark.util.Utils + + /** - * Parses and encapsulates arguments from the spark-submit script. - */ + * Pulls configuration information together in order of priority + * + * Entries in the conf Map will be filled in the following priority order + * 1. entries specified on the command line (except from --conf entries) + * 2. Entries specified on the command line with --conf + * 3. Environment variables (including legacy variable mappings) + * 4. System config variables (eg by using -Dspark.var.name) + * 5 SPARK_DEFAULT_CONF/spark-defaults.conf or SPARK_HOME/conf/spark-defaults.conf if either exist + * 6. hard coded defaults in class path at spark-submit-defaults.prop + * + * A property file specified by one of the means listed above gets read in and the properties are + * considered to be at the priority of the method that specified the files. A property specified in + * a property file will not override an existing config value at that same level +*/ private[spark] class SparkSubmitArguments(args: Seq[String]) { - var master: String = null - var deployMode: String = null - var executorMemory: String = null - var executorCores: String = null - var totalExecutorCores: String = null - var propertiesFile: String = null - var driverMemory: String = null - var driverExtraClassPath: String = null - var driverExtraLibraryPath: String = null - var driverExtraJavaOptions: String = null - var driverCores: String = null - var supervise: Boolean = false - var queue: String = null - var numExecutors: String = null - var files: String = null - var archives: String = null - var mainClass: String = null - var primaryResource: String = null - var name: String = null - var childArgs: ArrayBuffer[String] = new ArrayBuffer[String]() - var jars: String = null - var verbose: Boolean = false - var isPython: Boolean = false - var pyFiles: String = null - val sparkProperties: HashMap[String, String] = new HashMap[String, String]() - - /** Default properties present in the currently defined defaults file. */ - lazy val defaultSparkProperties: HashMap[String, String] = { -val defaultProperties = new HashMap[String, String]() -if (verbose) SparkSubmit.printStream.println(sUsing properties file: $propertiesFile) -Option(propertiesFile).foreach { filename = - val file = new File(filename) - SparkSubmitArguments.getPropertiesFromFile(file).foreach { case (k, v) = -if (k.startsWith(spark)) { - defaultProperties(k) = v - if (verbose) SparkSubmit.printStream.println(sAdding default property: $k=$v) -} else { - SparkSubmit.printWarning(sIgnoring non-spark config property: $k=$v) -} - } -} -defaultProperties - } + /** + * Stores all configuration items except for child arguments, + * referenced by the constants defined in ConfigConstants.scala + */ + val conf = new mutable.HashMap[String, String]() + + def master = conf(SparkMaster) + def master_= (value: String):Unit = conf.put(SparkMaster, value) + + def deployMode = conf(SparkDeployMode) + def deployMode_= (value: String):Unit = conf.put(SparkDeployMode, value) + + def executorMemory = conf(SparkExecutorMemory) + def executorMemory_= (value: String):Unit = conf.put(SparkExecutorMemory, value) + + def executorCores = conf(SparkExecutorCores) + def executorCores_= (value: String):Unit = conf.put(SparkExecutorCores, value) + + def totalExecutorCores = conf.get(SparkCoresMax) + def totalExecutorCores_= (value: String):Unit = conf.put(SparkCoresMax, value) + + def driverMemory = conf(SparkDriverMemory) + def driverMemory_= (value: String):Unit = conf.put(SparkDriverMemory, value) + + def driverExtraClassPath
[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
Github user tigerquoll commented on a diff in the pull request: https://github.com/apache/spark/pull/2516#discussion_r18067811 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala --- @@ -17,155 +17,195 @@ package org.apache.spark.deploy -import java.io.{File, FileInputStream, IOException} -import java.util.Properties +import java.io.{InputStreamReader, File, FileInputStream, InputStream} import java.util.jar.JarFile +import java.util.Properties +import scala.collection._ +import scala.collection.JavaConverters._ import scala.collection.JavaConversions._ -import scala.collection.mutable.{ArrayBuffer, HashMap} +import org.apache.commons.lang3.CharEncoding -import org.apache.spark.SparkException +import org.apache.spark.deploy.ConfigConstants._ import org.apache.spark.util.Utils + + /** - * Parses and encapsulates arguments from the spark-submit script. - */ + * Pulls configuration information together in order of priority + * + * Entries in the conf Map will be filled in the following priority order + * 1. entries specified on the command line (except from --conf entries) + * 2. Entries specified on the command line with --conf + * 3. Environment variables (including legacy variable mappings) + * 4. System config variables (eg by using -Dspark.var.name) + * 5 SPARK_DEFAULT_CONF/spark-defaults.conf or SPARK_HOME/conf/spark-defaults.conf if either exist + * 6. hard coded defaults in class path at spark-submit-defaults.prop + * + * A property file specified by one of the means listed above gets read in and the properties are + * considered to be at the priority of the method that specified the files. A property specified in + * a property file will not override an existing config value at that same level +*/ private[spark] class SparkSubmitArguments(args: Seq[String]) { - var master: String = null - var deployMode: String = null - var executorMemory: String = null - var executorCores: String = null - var totalExecutorCores: String = null - var propertiesFile: String = null - var driverMemory: String = null - var driverExtraClassPath: String = null - var driverExtraLibraryPath: String = null - var driverExtraJavaOptions: String = null - var driverCores: String = null - var supervise: Boolean = false - var queue: String = null - var numExecutors: String = null - var files: String = null - var archives: String = null - var mainClass: String = null - var primaryResource: String = null - var name: String = null - var childArgs: ArrayBuffer[String] = new ArrayBuffer[String]() - var jars: String = null - var verbose: Boolean = false - var isPython: Boolean = false - var pyFiles: String = null - val sparkProperties: HashMap[String, String] = new HashMap[String, String]() - - /** Default properties present in the currently defined defaults file. */ - lazy val defaultSparkProperties: HashMap[String, String] = { -val defaultProperties = new HashMap[String, String]() -if (verbose) SparkSubmit.printStream.println(sUsing properties file: $propertiesFile) -Option(propertiesFile).foreach { filename = - val file = new File(filename) - SparkSubmitArguments.getPropertiesFromFile(file).foreach { case (k, v) = -if (k.startsWith(spark)) { - defaultProperties(k) = v - if (verbose) SparkSubmit.printStream.println(sAdding default property: $k=$v) -} else { - SparkSubmit.printWarning(sIgnoring non-spark config property: $k=$v) -} - } -} -defaultProperties - } + /** + * Stores all configuration items except for child arguments, + * referenced by the constants defined in ConfigConstants.scala + */ + val conf = new mutable.HashMap[String, String]() + + def master = conf(SparkMaster) + def master_= (value: String):Unit = conf.put(SparkMaster, value) + + def deployMode = conf(SparkDeployMode) + def deployMode_= (value: String):Unit = conf.put(SparkDeployMode, value) + + def executorMemory = conf(SparkExecutorMemory) + def executorMemory_= (value: String):Unit = conf.put(SparkExecutorMemory, value) + + def executorCores = conf(SparkExecutorCores) + def executorCores_= (value: String):Unit = conf.put(SparkExecutorCores, value) + + def totalExecutorCores = conf.get(SparkCoresMax) + def totalExecutorCores_= (value: String):Unit = conf.put(SparkCoresMax, value) + + def driverMemory = conf(SparkDriverMemory) + def driverMemory_= (value: String):Unit = conf.put(SparkDriverMemory, value) + + def driverExtraClassPath
[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
Github user tigerquoll commented on a diff in the pull request: https://github.com/apache/spark/pull/2516#discussion_r18067818 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala --- @@ -17,155 +17,195 @@ package org.apache.spark.deploy -import java.io.{File, FileInputStream, IOException} -import java.util.Properties +import java.io.{InputStreamReader, File, FileInputStream, InputStream} import java.util.jar.JarFile +import java.util.Properties +import scala.collection._ +import scala.collection.JavaConverters._ import scala.collection.JavaConversions._ -import scala.collection.mutable.{ArrayBuffer, HashMap} +import org.apache.commons.lang3.CharEncoding -import org.apache.spark.SparkException +import org.apache.spark.deploy.ConfigConstants._ import org.apache.spark.util.Utils + + /** - * Parses and encapsulates arguments from the spark-submit script. - */ + * Pulls configuration information together in order of priority + * + * Entries in the conf Map will be filled in the following priority order + * 1. entries specified on the command line (except from --conf entries) + * 2. Entries specified on the command line with --conf + * 3. Environment variables (including legacy variable mappings) + * 4. System config variables (eg by using -Dspark.var.name) + * 5 SPARK_DEFAULT_CONF/spark-defaults.conf or SPARK_HOME/conf/spark-defaults.conf if either exist + * 6. hard coded defaults in class path at spark-submit-defaults.prop + * + * A property file specified by one of the means listed above gets read in and the properties are + * considered to be at the priority of the method that specified the files. A property specified in + * a property file will not override an existing config value at that same level +*/ private[spark] class SparkSubmitArguments(args: Seq[String]) { - var master: String = null - var deployMode: String = null - var executorMemory: String = null - var executorCores: String = null - var totalExecutorCores: String = null - var propertiesFile: String = null - var driverMemory: String = null - var driverExtraClassPath: String = null - var driverExtraLibraryPath: String = null - var driverExtraJavaOptions: String = null - var driverCores: String = null - var supervise: Boolean = false - var queue: String = null - var numExecutors: String = null - var files: String = null - var archives: String = null - var mainClass: String = null - var primaryResource: String = null - var name: String = null - var childArgs: ArrayBuffer[String] = new ArrayBuffer[String]() - var jars: String = null - var verbose: Boolean = false - var isPython: Boolean = false - var pyFiles: String = null - val sparkProperties: HashMap[String, String] = new HashMap[String, String]() - - /** Default properties present in the currently defined defaults file. */ - lazy val defaultSparkProperties: HashMap[String, String] = { -val defaultProperties = new HashMap[String, String]() -if (verbose) SparkSubmit.printStream.println(sUsing properties file: $propertiesFile) -Option(propertiesFile).foreach { filename = - val file = new File(filename) - SparkSubmitArguments.getPropertiesFromFile(file).foreach { case (k, v) = -if (k.startsWith(spark)) { - defaultProperties(k) = v - if (verbose) SparkSubmit.printStream.println(sAdding default property: $k=$v) -} else { - SparkSubmit.printWarning(sIgnoring non-spark config property: $k=$v) -} - } -} -defaultProperties - } + /** + * Stores all configuration items except for child arguments, + * referenced by the constants defined in ConfigConstants.scala + */ + val conf = new mutable.HashMap[String, String]() + + def master = conf(SparkMaster) + def master_= (value: String):Unit = conf.put(SparkMaster, value) + + def deployMode = conf(SparkDeployMode) + def deployMode_= (value: String):Unit = conf.put(SparkDeployMode, value) + + def executorMemory = conf(SparkExecutorMemory) + def executorMemory_= (value: String):Unit = conf.put(SparkExecutorMemory, value) + + def executorCores = conf(SparkExecutorCores) + def executorCores_= (value: String):Unit = conf.put(SparkExecutorCores, value) + + def totalExecutorCores = conf.get(SparkCoresMax) + def totalExecutorCores_= (value: String):Unit = conf.put(SparkCoresMax, value) + + def driverMemory = conf(SparkDriverMemory) + def driverMemory_= (value: String):Unit = conf.put(SparkDriverMemory, value) + + def driverExtraClassPath
[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
Github user tigerquoll commented on a diff in the pull request: https://github.com/apache/spark/pull/2516#discussion_r18068019 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala --- @@ -17,155 +17,195 @@ package org.apache.spark.deploy -import java.io.{File, FileInputStream, IOException} -import java.util.Properties +import java.io.{InputStreamReader, File, FileInputStream, InputStream} import java.util.jar.JarFile +import java.util.Properties +import scala.collection._ +import scala.collection.JavaConverters._ import scala.collection.JavaConversions._ -import scala.collection.mutable.{ArrayBuffer, HashMap} +import org.apache.commons.lang3.CharEncoding -import org.apache.spark.SparkException +import org.apache.spark.deploy.ConfigConstants._ import org.apache.spark.util.Utils + + /** - * Parses and encapsulates arguments from the spark-submit script. - */ + * Pulls configuration information together in order of priority + * + * Entries in the conf Map will be filled in the following priority order + * 1. entries specified on the command line (except from --conf entries) + * 2. Entries specified on the command line with --conf + * 3. Environment variables (including legacy variable mappings) + * 4. System config variables (eg by using -Dspark.var.name) + * 5 SPARK_DEFAULT_CONF/spark-defaults.conf or SPARK_HOME/conf/spark-defaults.conf if either exist + * 6. hard coded defaults in class path at spark-submit-defaults.prop + * + * A property file specified by one of the means listed above gets read in and the properties are + * considered to be at the priority of the method that specified the files. A property specified in + * a property file will not override an existing config value at that same level +*/ private[spark] class SparkSubmitArguments(args: Seq[String]) { - var master: String = null - var deployMode: String = null - var executorMemory: String = null - var executorCores: String = null - var totalExecutorCores: String = null - var propertiesFile: String = null - var driverMemory: String = null - var driverExtraClassPath: String = null - var driverExtraLibraryPath: String = null - var driverExtraJavaOptions: String = null - var driverCores: String = null - var supervise: Boolean = false - var queue: String = null - var numExecutors: String = null - var files: String = null - var archives: String = null - var mainClass: String = null - var primaryResource: String = null - var name: String = null - var childArgs: ArrayBuffer[String] = new ArrayBuffer[String]() - var jars: String = null - var verbose: Boolean = false - var isPython: Boolean = false - var pyFiles: String = null - val sparkProperties: HashMap[String, String] = new HashMap[String, String]() - - /** Default properties present in the currently defined defaults file. */ - lazy val defaultSparkProperties: HashMap[String, String] = { -val defaultProperties = new HashMap[String, String]() -if (verbose) SparkSubmit.printStream.println(sUsing properties file: $propertiesFile) -Option(propertiesFile).foreach { filename = - val file = new File(filename) - SparkSubmitArguments.getPropertiesFromFile(file).foreach { case (k, v) = -if (k.startsWith(spark)) { - defaultProperties(k) = v - if (verbose) SparkSubmit.printStream.println(sAdding default property: $k=$v) -} else { - SparkSubmit.printWarning(sIgnoring non-spark config property: $k=$v) -} - } -} -defaultProperties - } + /** + * Stores all configuration items except for child arguments, + * referenced by the constants defined in ConfigConstants.scala + */ + val conf = new mutable.HashMap[String, String]() + + def master = conf(SparkMaster) + def master_= (value: String):Unit = conf.put(SparkMaster, value) + + def deployMode = conf(SparkDeployMode) + def deployMode_= (value: String):Unit = conf.put(SparkDeployMode, value) + + def executorMemory = conf(SparkExecutorMemory) + def executorMemory_= (value: String):Unit = conf.put(SparkExecutorMemory, value) + + def executorCores = conf(SparkExecutorCores) + def executorCores_= (value: String):Unit = conf.put(SparkExecutorCores, value) + + def totalExecutorCores = conf.get(SparkCoresMax) + def totalExecutorCores_= (value: String):Unit = conf.put(SparkCoresMax, value) + + def driverMemory = conf(SparkDriverMemory) + def driverMemory_= (value: String):Unit = conf.put(SparkDriverMemory, value) + + def driverExtraClassPath
[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
Github user tigerquoll commented on a diff in the pull request: https://github.com/apache/spark/pull/2516#discussion_r18068346 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala --- @@ -17,155 +17,195 @@ package org.apache.spark.deploy -import java.io.{File, FileInputStream, IOException} -import java.util.Properties +import java.io.{InputStreamReader, File, FileInputStream, InputStream} import java.util.jar.JarFile +import java.util.Properties +import scala.collection._ +import scala.collection.JavaConverters._ import scala.collection.JavaConversions._ -import scala.collection.mutable.{ArrayBuffer, HashMap} +import org.apache.commons.lang3.CharEncoding -import org.apache.spark.SparkException +import org.apache.spark.deploy.ConfigConstants._ import org.apache.spark.util.Utils + + /** - * Parses and encapsulates arguments from the spark-submit script. - */ + * Pulls configuration information together in order of priority + * + * Entries in the conf Map will be filled in the following priority order + * 1. entries specified on the command line (except from --conf entries) + * 2. Entries specified on the command line with --conf + * 3. Environment variables (including legacy variable mappings) + * 4. System config variables (eg by using -Dspark.var.name) + * 5 SPARK_DEFAULT_CONF/spark-defaults.conf or SPARK_HOME/conf/spark-defaults.conf if either exist + * 6. hard coded defaults in class path at spark-submit-defaults.prop + * + * A property file specified by one of the means listed above gets read in and the properties are + * considered to be at the priority of the method that specified the files. A property specified in + * a property file will not override an existing config value at that same level +*/ private[spark] class SparkSubmitArguments(args: Seq[String]) { - var master: String = null - var deployMode: String = null - var executorMemory: String = null - var executorCores: String = null - var totalExecutorCores: String = null - var propertiesFile: String = null - var driverMemory: String = null - var driverExtraClassPath: String = null - var driverExtraLibraryPath: String = null - var driverExtraJavaOptions: String = null - var driverCores: String = null - var supervise: Boolean = false - var queue: String = null - var numExecutors: String = null - var files: String = null - var archives: String = null - var mainClass: String = null - var primaryResource: String = null - var name: String = null - var childArgs: ArrayBuffer[String] = new ArrayBuffer[String]() - var jars: String = null - var verbose: Boolean = false - var isPython: Boolean = false - var pyFiles: String = null - val sparkProperties: HashMap[String, String] = new HashMap[String, String]() - - /** Default properties present in the currently defined defaults file. */ - lazy val defaultSparkProperties: HashMap[String, String] = { -val defaultProperties = new HashMap[String, String]() -if (verbose) SparkSubmit.printStream.println(sUsing properties file: $propertiesFile) -Option(propertiesFile).foreach { filename = - val file = new File(filename) - SparkSubmitArguments.getPropertiesFromFile(file).foreach { case (k, v) = -if (k.startsWith(spark)) { - defaultProperties(k) = v - if (verbose) SparkSubmit.printStream.println(sAdding default property: $k=$v) -} else { - SparkSubmit.printWarning(sIgnoring non-spark config property: $k=$v) -} - } -} -defaultProperties - } + /** + * Stores all configuration items except for child arguments, + * referenced by the constants defined in ConfigConstants.scala + */ + val conf = new mutable.HashMap[String, String]() + + def master = conf(SparkMaster) + def master_= (value: String):Unit = conf.put(SparkMaster, value) + + def deployMode = conf(SparkDeployMode) + def deployMode_= (value: String):Unit = conf.put(SparkDeployMode, value) + + def executorMemory = conf(SparkExecutorMemory) + def executorMemory_= (value: String):Unit = conf.put(SparkExecutorMemory, value) + + def executorCores = conf(SparkExecutorCores) + def executorCores_= (value: String):Unit = conf.put(SparkExecutorCores, value) + + def totalExecutorCores = conf.get(SparkCoresMax) + def totalExecutorCores_= (value: String):Unit = conf.put(SparkCoresMax, value) + + def driverMemory = conf(SparkDriverMemory) + def driverMemory_= (value: String):Unit = conf.put(SparkDriverMemory, value) + + def driverExtraClassPath
[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
Github user tigerquoll commented on a diff in the pull request: https://github.com/apache/spark/pull/2516#discussion_r18068465 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala --- @@ -17,155 +17,195 @@ package org.apache.spark.deploy -import java.io.{File, FileInputStream, IOException} -import java.util.Properties +import java.io.{InputStreamReader, File, FileInputStream, InputStream} import java.util.jar.JarFile +import java.util.Properties +import scala.collection._ +import scala.collection.JavaConverters._ import scala.collection.JavaConversions._ -import scala.collection.mutable.{ArrayBuffer, HashMap} +import org.apache.commons.lang3.CharEncoding -import org.apache.spark.SparkException +import org.apache.spark.deploy.ConfigConstants._ import org.apache.spark.util.Utils + + /** - * Parses and encapsulates arguments from the spark-submit script. - */ + * Pulls configuration information together in order of priority + * + * Entries in the conf Map will be filled in the following priority order + * 1. entries specified on the command line (except from --conf entries) + * 2. Entries specified on the command line with --conf + * 3. Environment variables (including legacy variable mappings) + * 4. System config variables (eg by using -Dspark.var.name) + * 5 SPARK_DEFAULT_CONF/spark-defaults.conf or SPARK_HOME/conf/spark-defaults.conf if either exist + * 6. hard coded defaults in class path at spark-submit-defaults.prop + * + * A property file specified by one of the means listed above gets read in and the properties are + * considered to be at the priority of the method that specified the files. A property specified in + * a property file will not override an existing config value at that same level +*/ private[spark] class SparkSubmitArguments(args: Seq[String]) { - var master: String = null - var deployMode: String = null - var executorMemory: String = null - var executorCores: String = null - var totalExecutorCores: String = null - var propertiesFile: String = null - var driverMemory: String = null - var driverExtraClassPath: String = null - var driverExtraLibraryPath: String = null - var driverExtraJavaOptions: String = null - var driverCores: String = null - var supervise: Boolean = false - var queue: String = null - var numExecutors: String = null - var files: String = null - var archives: String = null - var mainClass: String = null - var primaryResource: String = null - var name: String = null - var childArgs: ArrayBuffer[String] = new ArrayBuffer[String]() - var jars: String = null - var verbose: Boolean = false - var isPython: Boolean = false - var pyFiles: String = null - val sparkProperties: HashMap[String, String] = new HashMap[String, String]() - - /** Default properties present in the currently defined defaults file. */ - lazy val defaultSparkProperties: HashMap[String, String] = { -val defaultProperties = new HashMap[String, String]() -if (verbose) SparkSubmit.printStream.println(sUsing properties file: $propertiesFile) -Option(propertiesFile).foreach { filename = - val file = new File(filename) - SparkSubmitArguments.getPropertiesFromFile(file).foreach { case (k, v) = -if (k.startsWith(spark)) { - defaultProperties(k) = v - if (verbose) SparkSubmit.printStream.println(sAdding default property: $k=$v) -} else { - SparkSubmit.printWarning(sIgnoring non-spark config property: $k=$v) -} - } -} -defaultProperties - } + /** + * Stores all configuration items except for child arguments, + * referenced by the constants defined in ConfigConstants.scala + */ + val conf = new mutable.HashMap[String, String]() + + def master = conf(SparkMaster) + def master_= (value: String):Unit = conf.put(SparkMaster, value) + + def deployMode = conf(SparkDeployMode) + def deployMode_= (value: String):Unit = conf.put(SparkDeployMode, value) + + def executorMemory = conf(SparkExecutorMemory) + def executorMemory_= (value: String):Unit = conf.put(SparkExecutorMemory, value) + + def executorCores = conf(SparkExecutorCores) + def executorCores_= (value: String):Unit = conf.put(SparkExecutorCores, value) + + def totalExecutorCores = conf.get(SparkCoresMax) + def totalExecutorCores_= (value: String):Unit = conf.put(SparkCoresMax, value) + + def driverMemory = conf(SparkDriverMemory) + def driverMemory_= (value: String):Unit = conf.put(SparkDriverMemory, value) + + def driverExtraClassPath
[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
Github user tigerquoll commented on a diff in the pull request: https://github.com/apache/spark/pull/2516#discussion_r18068498 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala --- @@ -17,155 +17,195 @@ package org.apache.spark.deploy -import java.io.{File, FileInputStream, IOException} -import java.util.Properties +import java.io.{InputStreamReader, File, FileInputStream, InputStream} import java.util.jar.JarFile +import java.util.Properties +import scala.collection._ +import scala.collection.JavaConverters._ import scala.collection.JavaConversions._ -import scala.collection.mutable.{ArrayBuffer, HashMap} +import org.apache.commons.lang3.CharEncoding -import org.apache.spark.SparkException +import org.apache.spark.deploy.ConfigConstants._ import org.apache.spark.util.Utils + + /** - * Parses and encapsulates arguments from the spark-submit script. - */ + * Pulls configuration information together in order of priority + * + * Entries in the conf Map will be filled in the following priority order + * 1. entries specified on the command line (except from --conf entries) + * 2. Entries specified on the command line with --conf + * 3. Environment variables (including legacy variable mappings) + * 4. System config variables (eg by using -Dspark.var.name) + * 5 SPARK_DEFAULT_CONF/spark-defaults.conf or SPARK_HOME/conf/spark-defaults.conf if either exist + * 6. hard coded defaults in class path at spark-submit-defaults.prop + * + * A property file specified by one of the means listed above gets read in and the properties are + * considered to be at the priority of the method that specified the files. A property specified in + * a property file will not override an existing config value at that same level +*/ private[spark] class SparkSubmitArguments(args: Seq[String]) { - var master: String = null - var deployMode: String = null - var executorMemory: String = null - var executorCores: String = null - var totalExecutorCores: String = null - var propertiesFile: String = null - var driverMemory: String = null - var driverExtraClassPath: String = null - var driverExtraLibraryPath: String = null - var driverExtraJavaOptions: String = null - var driverCores: String = null - var supervise: Boolean = false - var queue: String = null - var numExecutors: String = null - var files: String = null - var archives: String = null - var mainClass: String = null - var primaryResource: String = null - var name: String = null - var childArgs: ArrayBuffer[String] = new ArrayBuffer[String]() - var jars: String = null - var verbose: Boolean = false - var isPython: Boolean = false - var pyFiles: String = null - val sparkProperties: HashMap[String, String] = new HashMap[String, String]() - - /** Default properties present in the currently defined defaults file. */ - lazy val defaultSparkProperties: HashMap[String, String] = { -val defaultProperties = new HashMap[String, String]() -if (verbose) SparkSubmit.printStream.println(sUsing properties file: $propertiesFile) -Option(propertiesFile).foreach { filename = - val file = new File(filename) - SparkSubmitArguments.getPropertiesFromFile(file).foreach { case (k, v) = -if (k.startsWith(spark)) { - defaultProperties(k) = v - if (verbose) SparkSubmit.printStream.println(sAdding default property: $k=$v) -} else { - SparkSubmit.printWarning(sIgnoring non-spark config property: $k=$v) -} - } -} -defaultProperties - } + /** + * Stores all configuration items except for child arguments, + * referenced by the constants defined in ConfigConstants.scala + */ + val conf = new mutable.HashMap[String, String]() + + def master = conf(SparkMaster) + def master_= (value: String):Unit = conf.put(SparkMaster, value) + + def deployMode = conf(SparkDeployMode) + def deployMode_= (value: String):Unit = conf.put(SparkDeployMode, value) + + def executorMemory = conf(SparkExecutorMemory) + def executorMemory_= (value: String):Unit = conf.put(SparkExecutorMemory, value) + + def executorCores = conf(SparkExecutorCores) + def executorCores_= (value: String):Unit = conf.put(SparkExecutorCores, value) + + def totalExecutorCores = conf.get(SparkCoresMax) + def totalExecutorCores_= (value: String):Unit = conf.put(SparkCoresMax, value) + + def driverMemory = conf(SparkDriverMemory) + def driverMemory_= (value: String):Unit = conf.put(SparkDriverMemory, value) + + def driverExtraClassPath
[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
Github user tigerquoll commented on a diff in the pull request: https://github.com/apache/spark/pull/2516#discussion_r18068639 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala --- @@ -188,41 +228,16 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) { either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.) } } + } override def toString = { -sParsed arguments: -| master $master -| deployMode $deployMode -| executorMemory $executorMemory -| executorCores $executorCores -| totalExecutorCores $totalExecutorCores -| propertiesFile $propertiesFile -| extraSparkProperties$sparkProperties -| driverMemory$driverMemory -| driverCores $driverCores -| driverExtraClassPath$driverExtraClassPath -| driverExtraLibraryPath $driverExtraLibraryPath -| driverExtraJavaOptions $driverExtraJavaOptions -| supervise $supervise -| queue $queue -| numExecutors$numExecutors -| files $files -| pyFiles $pyFiles -| archives$archives -| mainClass $mainClass -| primaryResource $primaryResource -| name$name -| childArgs [${childArgs.mkString( )}] -| jars$jars -| verbose $verbose -| -|Default properties from $propertiesFile: -|${defaultSparkProperties.mkString( , \n , \n)} -.stripMargin +conf.mkString(\n) } - /** Fill in values by parsing user options. */ + /** --- End diff -- reverted --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
Github user tigerquoll commented on a diff in the pull request: https://github.com/apache/spark/pull/2516#discussion_r18068723 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala --- @@ -406,22 +412,173 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) { } } -object SparkSubmitArguments { - /** Load properties present in the given file. */ - def getPropertiesFromFile(file: File): Seq[(String, String)] = { -require(file.exists(), sProperties file $file does not exist) -require(file.isFile(), sProperties file $file is not a normal file) -val inputStream = new FileInputStream(file) +private[spark] object SparkSubmitArguments { + /** + * Resolves Configuration sources in order of highest to lowest + * 1. Each map passed in as additionalConfig from first to last + * 2. Environment variables (including legacy variable mappings) + * 3. System config variables (eg by using -Dspark.var.name) + * 4 SPARK_DEFAULT_CONF/spark-defaults.conf or SPARK_HOME/conf/spark-defaults.conf + * 5. hard coded defaults in class path at spark-submit-defaults.prop + * + * A property file specified by one of the means listed above gets read in and the properties are + * considered to be at the priority of the method that specified the files. + * A property specified in a property file will not override an existing + * config value at that same level + * + * @param additionalConfigs Seq of additional Map[ConfigName-ConfigValue] in order of highest + * priority to lowest this will have priority over internal sources + * @return Map[propName-propFile] containing values merged from all sources in order of priority + */ + def mergeSparkProperties(additionalConfigs: Seq [Map[String,String]]) = { +// Configuration read in from spark-submit-defaults.prop file found on the classpath +var hardCodedDefaultConfig: Option[Map[String,String]] = None +var is: InputStream = null +var isr: Option[InputStreamReader] = None try { - val properties = new Properties() - properties.load(inputStream) - properties.stringPropertyNames().toSeq.map(k = (k, properties(k).trim)) -} catch { - case e: IOException = -val message = sFailed when loading Spark properties file $file -throw new SparkException(message, e) + is = Thread.currentThread().getContextClassLoader.getResourceAsStream(ClassPathSparkSubmitDefaults) + + // only open InputStreamReader if InputStream was successfully opened + isr = Option(is).map{is: InputStream = +new InputStreamReader(is, CharEncoding.UTF_8) + } + + hardCodedDefaultConfig = isr.map( defaultValueStream = + SparkSubmitArguments.getPropertyValuesFromStream(defaultValueStream)) } finally { - inputStream.close() + Option(is).foreach(_.close) + isr.foreach(_.close) } + +if (hardCodedDefaultConfig.isEmpty || (hardCodedDefaultConfig.get.size == 0)) { + throw new IllegalStateException(sDefault values not found at classpath $ClassPathSparkSubmitDefaults) +} + +// Configuration read in from defaults file if it exists +var sparkDefaultConfig = SparkSubmitArguments.getSparkDefaultFileConfig + +if (sparkDefaultConfig.isDefinedAt(SparkPropertiesFile)) { + SparkSubmitArguments.getPropertyValuesFromFile( + sparkDefaultConfig.get(SparkPropertiesFile).get) +} else { + Map.empty +} + +// Configuration from java system properties +val systemPropertyConfig = SparkSubmitArguments.getPropertyMap(System.getProperties) --- End diff -- Wish I'd seen that one before Done. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
Github user tigerquoll commented on a diff in the pull request: https://github.com/apache/spark/pull/2516#discussion_r18068816 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala --- @@ -406,22 +412,173 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) { } } -object SparkSubmitArguments { - /** Load properties present in the given file. */ - def getPropertiesFromFile(file: File): Seq[(String, String)] = { -require(file.exists(), sProperties file $file does not exist) -require(file.isFile(), sProperties file $file is not a normal file) -val inputStream = new FileInputStream(file) +private[spark] object SparkSubmitArguments { + /** + * Resolves Configuration sources in order of highest to lowest + * 1. Each map passed in as additionalConfig from first to last + * 2. Environment variables (including legacy variable mappings) + * 3. System config variables (eg by using -Dspark.var.name) + * 4 SPARK_DEFAULT_CONF/spark-defaults.conf or SPARK_HOME/conf/spark-defaults.conf + * 5. hard coded defaults in class path at spark-submit-defaults.prop + * + * A property file specified by one of the means listed above gets read in and the properties are + * considered to be at the priority of the method that specified the files. + * A property specified in a property file will not override an existing + * config value at that same level + * + * @param additionalConfigs Seq of additional Map[ConfigName-ConfigValue] in order of highest + * priority to lowest this will have priority over internal sources + * @return Map[propName-propFile] containing values merged from all sources in order of priority + */ + def mergeSparkProperties(additionalConfigs: Seq [Map[String,String]]) = { +// Configuration read in from spark-submit-defaults.prop file found on the classpath +var hardCodedDefaultConfig: Option[Map[String,String]] = None +var is: InputStream = null +var isr: Option[InputStreamReader] = None try { - val properties = new Properties() - properties.load(inputStream) - properties.stringPropertyNames().toSeq.map(k = (k, properties(k).trim)) -} catch { - case e: IOException = -val message = sFailed when loading Spark properties file $file -throw new SparkException(message, e) + is = Thread.currentThread().getContextClassLoader.getResourceAsStream(ClassPathSparkSubmitDefaults) + + // only open InputStreamReader if InputStream was successfully opened + isr = Option(is).map{is: InputStream = +new InputStreamReader(is, CharEncoding.UTF_8) + } + + hardCodedDefaultConfig = isr.map( defaultValueStream = + SparkSubmitArguments.getPropertyValuesFromStream(defaultValueStream)) } finally { - inputStream.close() + Option(is).foreach(_.close) + isr.foreach(_.close) } + +if (hardCodedDefaultConfig.isEmpty || (hardCodedDefaultConfig.get.size == 0)) { + throw new IllegalStateException(sDefault values not found at classpath $ClassPathSparkSubmitDefaults) +} + +// Configuration read in from defaults file if it exists +var sparkDefaultConfig = SparkSubmitArguments.getSparkDefaultFileConfig + +if (sparkDefaultConfig.isDefinedAt(SparkPropertiesFile)) { + SparkSubmitArguments.getPropertyValuesFromFile( --- End diff -- Line removed completely. All sources have property files loaded and properties merged later on in the function --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
Github user tigerquoll commented on a diff in the pull request: https://github.com/apache/spark/pull/2516#discussion_r18068897 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala --- @@ -406,22 +412,173 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) { } } -object SparkSubmitArguments { - /** Load properties present in the given file. */ - def getPropertiesFromFile(file: File): Seq[(String, String)] = { -require(file.exists(), sProperties file $file does not exist) -require(file.isFile(), sProperties file $file is not a normal file) -val inputStream = new FileInputStream(file) +private[spark] object SparkSubmitArguments { + /** + * Resolves Configuration sources in order of highest to lowest + * 1. Each map passed in as additionalConfig from first to last + * 2. Environment variables (including legacy variable mappings) + * 3. System config variables (eg by using -Dspark.var.name) + * 4 SPARK_DEFAULT_CONF/spark-defaults.conf or SPARK_HOME/conf/spark-defaults.conf + * 5. hard coded defaults in class path at spark-submit-defaults.prop + * + * A property file specified by one of the means listed above gets read in and the properties are + * considered to be at the priority of the method that specified the files. + * A property specified in a property file will not override an existing + * config value at that same level + * + * @param additionalConfigs Seq of additional Map[ConfigName-ConfigValue] in order of highest + * priority to lowest this will have priority over internal sources + * @return Map[propName-propFile] containing values merged from all sources in order of priority + */ + def mergeSparkProperties(additionalConfigs: Seq [Map[String,String]]) = { +// Configuration read in from spark-submit-defaults.prop file found on the classpath +var hardCodedDefaultConfig: Option[Map[String,String]] = None +var is: InputStream = null +var isr: Option[InputStreamReader] = None try { - val properties = new Properties() - properties.load(inputStream) - properties.stringPropertyNames().toSeq.map(k = (k, properties(k).trim)) -} catch { - case e: IOException = -val message = sFailed when loading Spark properties file $file -throw new SparkException(message, e) + is = Thread.currentThread().getContextClassLoader.getResourceAsStream(ClassPathSparkSubmitDefaults) + + // only open InputStreamReader if InputStream was successfully opened + isr = Option(is).map{is: InputStream = +new InputStreamReader(is, CharEncoding.UTF_8) + } + + hardCodedDefaultConfig = isr.map( defaultValueStream = + SparkSubmitArguments.getPropertyValuesFromStream(defaultValueStream)) } finally { - inputStream.close() + Option(is).foreach(_.close) + isr.foreach(_.close) } + +if (hardCodedDefaultConfig.isEmpty || (hardCodedDefaultConfig.get.size == 0)) { + throw new IllegalStateException(sDefault values not found at classpath $ClassPathSparkSubmitDefaults) +} + +// Configuration read in from defaults file if it exists +var sparkDefaultConfig = SparkSubmitArguments.getSparkDefaultFileConfig + +if (sparkDefaultConfig.isDefinedAt(SparkPropertiesFile)) { + SparkSubmitArguments.getPropertyValuesFromFile( + sparkDefaultConfig.get(SparkPropertiesFile).get) +} else { + Map.empty +} + +// Configuration from java system properties +val systemPropertyConfig = SparkSubmitArguments.getPropertyMap(System.getProperties) + +// Configuration variables from the environment +// support legacy variables +val environmentConfig = System.getenv().asScala + +val legacyEnvVars = Seq(MASTER-SparkMaster, DEPLOY_MODE-SparkDeployMode, --- End diff -- done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
Github user tigerquoll commented on a diff in the pull request: https://github.com/apache/spark/pull/2516#discussion_r18068968 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala --- @@ -406,22 +412,173 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) { } } -object SparkSubmitArguments { - /** Load properties present in the given file. */ - def getPropertiesFromFile(file: File): Seq[(String, String)] = { -require(file.exists(), sProperties file $file does not exist) -require(file.isFile(), sProperties file $file is not a normal file) -val inputStream = new FileInputStream(file) +private[spark] object SparkSubmitArguments { + /** + * Resolves Configuration sources in order of highest to lowest + * 1. Each map passed in as additionalConfig from first to last + * 2. Environment variables (including legacy variable mappings) + * 3. System config variables (eg by using -Dspark.var.name) + * 4 SPARK_DEFAULT_CONF/spark-defaults.conf or SPARK_HOME/conf/spark-defaults.conf + * 5. hard coded defaults in class path at spark-submit-defaults.prop + * + * A property file specified by one of the means listed above gets read in and the properties are + * considered to be at the priority of the method that specified the files. + * A property specified in a property file will not override an existing + * config value at that same level + * + * @param additionalConfigs Seq of additional Map[ConfigName-ConfigValue] in order of highest + * priority to lowest this will have priority over internal sources + * @return Map[propName-propFile] containing values merged from all sources in order of priority + */ + def mergeSparkProperties(additionalConfigs: Seq [Map[String,String]]) = { +// Configuration read in from spark-submit-defaults.prop file found on the classpath +var hardCodedDefaultConfig: Option[Map[String,String]] = None +var is: InputStream = null +var isr: Option[InputStreamReader] = None try { - val properties = new Properties() - properties.load(inputStream) - properties.stringPropertyNames().toSeq.map(k = (k, properties(k).trim)) -} catch { - case e: IOException = -val message = sFailed when loading Spark properties file $file -throw new SparkException(message, e) + is = Thread.currentThread().getContextClassLoader.getResourceAsStream(ClassPathSparkSubmitDefaults) + + // only open InputStreamReader if InputStream was successfully opened + isr = Option(is).map{is: InputStream = +new InputStreamReader(is, CharEncoding.UTF_8) + } + + hardCodedDefaultConfig = isr.map( defaultValueStream = + SparkSubmitArguments.getPropertyValuesFromStream(defaultValueStream)) } finally { - inputStream.close() + Option(is).foreach(_.close) + isr.foreach(_.close) } + +if (hardCodedDefaultConfig.isEmpty || (hardCodedDefaultConfig.get.size == 0)) { + throw new IllegalStateException(sDefault values not found at classpath $ClassPathSparkSubmitDefaults) +} + +// Configuration read in from defaults file if it exists +var sparkDefaultConfig = SparkSubmitArguments.getSparkDefaultFileConfig + +if (sparkDefaultConfig.isDefinedAt(SparkPropertiesFile)) { + SparkSubmitArguments.getPropertyValuesFromFile( + sparkDefaultConfig.get(SparkPropertiesFile).get) +} else { + Map.empty +} + +// Configuration from java system properties +val systemPropertyConfig = SparkSubmitArguments.getPropertyMap(System.getProperties) + +// Configuration variables from the environment +// support legacy variables +val environmentConfig = System.getenv().asScala + +val legacyEnvVars = Seq(MASTER-SparkMaster, DEPLOY_MODE-SparkDeployMode, + SPARK_DRIVER_MEMORY-SparkDriverMemory, SPARK_EXECUTOR_MEMORY-SparkExecutorMemory) + +// legacy variables act at the priority of a system property --- End diff -- To keep things cleaner, I was hoping to only pull information from the various sources, and produce a map at the end - what the caller choses to do with the map of values (push to system props, or env props or cmd line arguments etc) - would be up to the caller --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA
[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
Github user tigerquoll commented on a diff in the pull request: https://github.com/apache/spark/pull/2516#discussion_r18069354 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala --- @@ -406,22 +412,173 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) { } } -object SparkSubmitArguments { - /** Load properties present in the given file. */ - def getPropertiesFromFile(file: File): Seq[(String, String)] = { -require(file.exists(), sProperties file $file does not exist) -require(file.isFile(), sProperties file $file is not a normal file) -val inputStream = new FileInputStream(file) +private[spark] object SparkSubmitArguments { + /** + * Resolves Configuration sources in order of highest to lowest + * 1. Each map passed in as additionalConfig from first to last + * 2. Environment variables (including legacy variable mappings) + * 3. System config variables (eg by using -Dspark.var.name) + * 4 SPARK_DEFAULT_CONF/spark-defaults.conf or SPARK_HOME/conf/spark-defaults.conf + * 5. hard coded defaults in class path at spark-submit-defaults.prop + * + * A property file specified by one of the means listed above gets read in and the properties are + * considered to be at the priority of the method that specified the files. + * A property specified in a property file will not override an existing + * config value at that same level + * + * @param additionalConfigs Seq of additional Map[ConfigName-ConfigValue] in order of highest + * priority to lowest this will have priority over internal sources + * @return Map[propName-propFile] containing values merged from all sources in order of priority + */ + def mergeSparkProperties(additionalConfigs: Seq [Map[String,String]]) = { +// Configuration read in from spark-submit-defaults.prop file found on the classpath +var hardCodedDefaultConfig: Option[Map[String,String]] = None +var is: InputStream = null +var isr: Option[InputStreamReader] = None try { - val properties = new Properties() - properties.load(inputStream) - properties.stringPropertyNames().toSeq.map(k = (k, properties(k).trim)) -} catch { - case e: IOException = -val message = sFailed when loading Spark properties file $file -throw new SparkException(message, e) + is = Thread.currentThread().getContextClassLoader.getResourceAsStream(ClassPathSparkSubmitDefaults) + + // only open InputStreamReader if InputStream was successfully opened + isr = Option(is).map{is: InputStream = +new InputStreamReader(is, CharEncoding.UTF_8) + } + + hardCodedDefaultConfig = isr.map( defaultValueStream = + SparkSubmitArguments.getPropertyValuesFromStream(defaultValueStream)) } finally { - inputStream.close() + Option(is).foreach(_.close) + isr.foreach(_.close) } + +if (hardCodedDefaultConfig.isEmpty || (hardCodedDefaultConfig.get.size == 0)) { + throw new IllegalStateException(sDefault values not found at classpath $ClassPathSparkSubmitDefaults) +} + +// Configuration read in from defaults file if it exists +var sparkDefaultConfig = SparkSubmitArguments.getSparkDefaultFileConfig + +if (sparkDefaultConfig.isDefinedAt(SparkPropertiesFile)) { + SparkSubmitArguments.getPropertyValuesFromFile( + sparkDefaultConfig.get(SparkPropertiesFile).get) +} else { + Map.empty +} + +// Configuration from java system properties +val systemPropertyConfig = SparkSubmitArguments.getPropertyMap(System.getProperties) + +// Configuration variables from the environment +// support legacy variables +val environmentConfig = System.getenv().asScala + +val legacyEnvVars = Seq(MASTER-SparkMaster, DEPLOY_MODE-SparkDeployMode, + SPARK_DRIVER_MEMORY-SparkDriverMemory, SPARK_EXECUTOR_MEMORY-SparkExecutorMemory) + +// legacy variables act at the priority of a system property +val propsWithEnvVars : mutable.Map[String,String] = new mutable.HashMap() ++ systemPropertyConfig ++ legacyEnvVars + .map( {case(varName, propName) = (environmentConfig.get(varName), propName) }) + .filter( {case(varVariable, _) = varVariable.isDefined !varVariable.get.isEmpty} ) + .map{case(varVariable, propName) = (propName, varVariable.get)} + +val ConfigSources = additionalConfigs ++ Seq ( + environmentConfig, + propsWithEnvVars, + sparkDefaultConfig, + hardCodedDefaultConfig.get +) + +// Load properties file
[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
Github user tigerquoll commented on a diff in the pull request: https://github.com/apache/spark/pull/2516#discussion_r18070264 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala --- @@ -406,22 +412,173 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) { } } -object SparkSubmitArguments { - /** Load properties present in the given file. */ - def getPropertiesFromFile(file: File): Seq[(String, String)] = { -require(file.exists(), sProperties file $file does not exist) -require(file.isFile(), sProperties file $file is not a normal file) -val inputStream = new FileInputStream(file) +private[spark] object SparkSubmitArguments { + /** + * Resolves Configuration sources in order of highest to lowest + * 1. Each map passed in as additionalConfig from first to last + * 2. Environment variables (including legacy variable mappings) + * 3. System config variables (eg by using -Dspark.var.name) + * 4 SPARK_DEFAULT_CONF/spark-defaults.conf or SPARK_HOME/conf/spark-defaults.conf + * 5. hard coded defaults in class path at spark-submit-defaults.prop + * + * A property file specified by one of the means listed above gets read in and the properties are + * considered to be at the priority of the method that specified the files. + * A property specified in a property file will not override an existing + * config value at that same level + * + * @param additionalConfigs Seq of additional Map[ConfigName-ConfigValue] in order of highest + * priority to lowest this will have priority over internal sources + * @return Map[propName-propFile] containing values merged from all sources in order of priority + */ + def mergeSparkProperties(additionalConfigs: Seq [Map[String,String]]) = { +// Configuration read in from spark-submit-defaults.prop file found on the classpath +var hardCodedDefaultConfig: Option[Map[String,String]] = None +var is: InputStream = null +var isr: Option[InputStreamReader] = None try { - val properties = new Properties() - properties.load(inputStream) - properties.stringPropertyNames().toSeq.map(k = (k, properties(k).trim)) -} catch { - case e: IOException = -val message = sFailed when loading Spark properties file $file -throw new SparkException(message, e) + is = Thread.currentThread().getContextClassLoader.getResourceAsStream(ClassPathSparkSubmitDefaults) + + // only open InputStreamReader if InputStream was successfully opened + isr = Option(is).map{is: InputStream = +new InputStreamReader(is, CharEncoding.UTF_8) + } + + hardCodedDefaultConfig = isr.map( defaultValueStream = + SparkSubmitArguments.getPropertyValuesFromStream(defaultValueStream)) } finally { - inputStream.close() + Option(is).foreach(_.close) + isr.foreach(_.close) } + +if (hardCodedDefaultConfig.isEmpty || (hardCodedDefaultConfig.get.size == 0)) { + throw new IllegalStateException(sDefault values not found at classpath $ClassPathSparkSubmitDefaults) +} + +// Configuration read in from defaults file if it exists +var sparkDefaultConfig = SparkSubmitArguments.getSparkDefaultFileConfig + +if (sparkDefaultConfig.isDefinedAt(SparkPropertiesFile)) { + SparkSubmitArguments.getPropertyValuesFromFile( + sparkDefaultConfig.get(SparkPropertiesFile).get) +} else { + Map.empty +} + +// Configuration from java system properties +val systemPropertyConfig = SparkSubmitArguments.getPropertyMap(System.getProperties) + +// Configuration variables from the environment +// support legacy variables +val environmentConfig = System.getenv().asScala + +val legacyEnvVars = Seq(MASTER-SparkMaster, DEPLOY_MODE-SparkDeployMode, + SPARK_DRIVER_MEMORY-SparkDriverMemory, SPARK_EXECUTOR_MEMORY-SparkExecutorMemory) + +// legacy variables act at the priority of a system property +val propsWithEnvVars : mutable.Map[String,String] = new mutable.HashMap() ++ systemPropertyConfig ++ legacyEnvVars + .map( {case(varName, propName) = (environmentConfig.get(varName), propName) }) + .filter( {case(varVariable, _) = varVariable.isDefined !varVariable.get.isEmpty} ) + .map{case(varVariable, propName) = (propName, varVariable.get)} + +val ConfigSources = additionalConfigs ++ Seq ( + environmentConfig, + propsWithEnvVars, + sparkDefaultConfig, + hardCodedDefaultConfig.get +) + +// Load properties file
[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
Github user tigerquoll commented on a diff in the pull request: https://github.com/apache/spark/pull/2516#discussion_r18070363 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala --- @@ -406,22 +412,173 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) { } } -object SparkSubmitArguments { - /** Load properties present in the given file. */ - def getPropertiesFromFile(file: File): Seq[(String, String)] = { -require(file.exists(), sProperties file $file does not exist) -require(file.isFile(), sProperties file $file is not a normal file) -val inputStream = new FileInputStream(file) +private[spark] object SparkSubmitArguments { + /** + * Resolves Configuration sources in order of highest to lowest + * 1. Each map passed in as additionalConfig from first to last + * 2. Environment variables (including legacy variable mappings) + * 3. System config variables (eg by using -Dspark.var.name) + * 4 SPARK_DEFAULT_CONF/spark-defaults.conf or SPARK_HOME/conf/spark-defaults.conf + * 5. hard coded defaults in class path at spark-submit-defaults.prop + * + * A property file specified by one of the means listed above gets read in and the properties are + * considered to be at the priority of the method that specified the files. + * A property specified in a property file will not override an existing + * config value at that same level + * + * @param additionalConfigs Seq of additional Map[ConfigName-ConfigValue] in order of highest + * priority to lowest this will have priority over internal sources + * @return Map[propName-propFile] containing values merged from all sources in order of priority + */ + def mergeSparkProperties(additionalConfigs: Seq [Map[String,String]]) = { +// Configuration read in from spark-submit-defaults.prop file found on the classpath +var hardCodedDefaultConfig: Option[Map[String,String]] = None +var is: InputStream = null +var isr: Option[InputStreamReader] = None try { - val properties = new Properties() - properties.load(inputStream) - properties.stringPropertyNames().toSeq.map(k = (k, properties(k).trim)) -} catch { - case e: IOException = -val message = sFailed when loading Spark properties file $file -throw new SparkException(message, e) + is = Thread.currentThread().getContextClassLoader.getResourceAsStream(ClassPathSparkSubmitDefaults) + + // only open InputStreamReader if InputStream was successfully opened + isr = Option(is).map{is: InputStream = +new InputStreamReader(is, CharEncoding.UTF_8) + } + + hardCodedDefaultConfig = isr.map( defaultValueStream = + SparkSubmitArguments.getPropertyValuesFromStream(defaultValueStream)) } finally { - inputStream.close() + Option(is).foreach(_.close) + isr.foreach(_.close) } + +if (hardCodedDefaultConfig.isEmpty || (hardCodedDefaultConfig.get.size == 0)) { + throw new IllegalStateException(sDefault values not found at classpath $ClassPathSparkSubmitDefaults) +} + +// Configuration read in from defaults file if it exists +var sparkDefaultConfig = SparkSubmitArguments.getSparkDefaultFileConfig + +if (sparkDefaultConfig.isDefinedAt(SparkPropertiesFile)) { + SparkSubmitArguments.getPropertyValuesFromFile( + sparkDefaultConfig.get(SparkPropertiesFile).get) +} else { + Map.empty +} + +// Configuration from java system properties +val systemPropertyConfig = SparkSubmitArguments.getPropertyMap(System.getProperties) + +// Configuration variables from the environment +// support legacy variables +val environmentConfig = System.getenv().asScala + +val legacyEnvVars = Seq(MASTER-SparkMaster, DEPLOY_MODE-SparkDeployMode, + SPARK_DRIVER_MEMORY-SparkDriverMemory, SPARK_EXECUTOR_MEMORY-SparkExecutorMemory) + +// legacy variables act at the priority of a system property +val propsWithEnvVars : mutable.Map[String,String] = new mutable.HashMap() ++ systemPropertyConfig ++ legacyEnvVars + .map( {case(varName, propName) = (environmentConfig.get(varName), propName) }) + .filter( {case(varVariable, _) = varVariable.isDefined !varVariable.get.isEmpty} ) + .map{case(varVariable, propName) = (propName, varVariable.get)} + +val ConfigSources = additionalConfigs ++ Seq ( + environmentConfig, + propsWithEnvVars, + sparkDefaultConfig, + hardCodedDefaultConfig.get +) + +// Load properties file
[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
GitHub user tigerquoll opened a pull request: https://github.com/apache/spark/pull/2516 Spark Core - [SPARK-3620] - Refactor of SparkSubmit Argument parsing code Argument processing seems to have gotten a lot of attention lately, so I thought I might throw my contribution into the ring. Attached for consideration and to prompt discussion is a revamp of argument handling in SparkSubmit aimed at making things a lot more consistent. The only things that have been modified are the way that configuration properties are read/ processed and prioritised Things to note include: * All configuration parameters can now be consistently set via config file * Configuration parameters defaults have been removed from the code, and placed into a property file which is read from the class path on startup. There should be no need to trace through 5 files to see what a config parameter defaults to if it is not specified, or have different default values applied in multiple places throughout the code. * Configuration parameter validation is now done once all configuration parameters have been read in and resolved from various locations, not just when reading the command line. * All property files (including spark_default_conf) are parsed by Java property handling code. All custom parsing code has been removed. Escaping of characters should now be consistent everywhere. * All configuration parameters are overridden in the same consistent way - configuration parameters for sparkSubmit are pulled form the following sources in order of priority 1. Entries specified on the command line (except from --conf entries) 2. Entries specified on the command line with --conf 3. Environment variables (including legacy variable mappings) 4. System config variables (eg by using -Dspark.var.name) 5. $(SPARK_DEFAULT_CONF)/spark-defaults.conf or $(SPARK_HOME)/conf/spark-defaults.conf if either exist 6. Hard coded defaults in class path at spark-submit-defaults.prop * A property file specified by one of the sources listed above gets read in and the properties are considered to be at the priority of the configuration source that specified the file. A property specified in a property file will not override an existing config value already specifiedby that configuration source The existing argument handling is pretty finicky - chances are high that Iâve missed some behaviour - if this PR is going to be accepted/approved let me know any bugs and Iâll fix them up and document the behaviour for future reference You can merge this pull request into a Git repository by running: $ git pull https://github.com/tigerquoll/spark-3620 master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/2516.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2516 commit b1a9682dd2bbff824c4e8481fa0ce5118c47de68 Author: Dale tigerqu...@outlook.com Date: 2014-09-21T02:42:24Z Initial pass at using typesafe's conf object for handling configuration options commit 7bb5ee95b3f06147dba994e3d557221554415bfd Author: Dale tigerqu...@outlook.com Date: 2014-09-21T02:44:09Z Added defaults file commit e995a6d1e8ab898c85aa5fe259b81c630595075f Author: Dale tigerqu...@outlook.com Date: 2014-09-21T12:56:17Z Existing tests now work commit 00ee008c5652336d533d9619bc7e6306ed59138b Author: Dale tigerqu...@outlook.com Date: 2014-09-21T13:05:14Z Existing tests now work commit 295c62b067fb5204efb58892133c77fe49b877e0 Author: Dale tigerqu...@outlook.com Date: 2014-09-22T22:04:45Z Created mergedPropertyMap commit f399170e1c05d75257ff6c508a96e64cadf0d87b Author: Dale tigerqu...@outlook.com Date: 2014-09-23T00:10:40Z Moved sparkSubmitArguments module to use custom property map merging code commit b0abe3196f9e5d3f577e158704740f1eee8fbb59 Author: Dale tigerqu...@outlook.com Date: 2014-09-23T23:58:55Z Merge branch 'master' of https://github.com/apache/spark commit 562ec7c064e5ad632cf7aaa1720be29fe36b5c9a Author: Dale tigerqu...@outlook.com Date: 2014-09-23T23:59:52Z note for additional tests commit 86f71f8bb8291fe20a2f0ca0100727d583e97dfd Author: Dale tigerqu...@outlook.com Date: 2014-09-24T00:39:47Z Changes needed to pass scalastyle check commit 2019554ec307c8d3eabee7e4299cd8bac8faba0f Author: Dale tigerqu...@outlook.com Date: 2014-09-24T04:43:58Z Changes needed to pass scalastyle check, merged from current SparkSubmit.scala commit 8c416a04d064c1475a184785a9135d849c239bff Author: Dale tigerqu...@outlook.com Date: 2014-09-24T05:19:24Z Fixed some typos commit b69f58e65d919a689942866f59b11a7dcf2fbf91 Author: Dale tigerqu...@outlook.com Date: 2014-09-24T07:08:01Z Added spark.app.name to defaults list
[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
Github user tigerquoll commented on a diff in the pull request: https://github.com/apache/spark/pull/2516#discussion_r18008077 --- Diff: core/src/main/resources/org/apache/spark/deploy/spark-submit-defaults.prop --- @@ -0,0 +1,90 @@ +# The master URL for the cluster egspark://host:port, mesos://host:port, yarn, or local. +# legacy env variable MASTER +spark.master = local[*] + +# Should spark submit run in verbose mode: default is false +spark.verbose = false + +# Comma-separated list of files to be placed in the working directory of each executor +# spark.files = + +# Comma-separated list of .zip, .egg, or .py files to place on the PYTHONPATH for Python apps. +# spark.submit.pyfiles = + +# Comma-separated list of local jars to include on the driver and executor classpaths. +# spark.jars = + + +# Path to a bundled jar including your application and all dependencies. +# The URL must be globally visible inside of your cluster, for instance, +# an hdfs:// path or a file:// path that is present on all nodes. +# spark.app.primaryResource = + +# A name of your application. +spark.app.name = Unknown Application --- End diff -- ahh.. Looking at the code it was hard to determine if primaryResource was a required configuration item at all, which is why was I defined it as an Option[String]. I'll remove the default value, change it back to a String and add an explicit test to make sure it is defined after all new config items have been derived --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
Github user tigerquoll commented on a diff in the pull request: https://github.com/apache/spark/pull/2516#discussion_r18008488 --- Diff: core/src/main/resources/org/apache/spark/deploy/spark-submit-defaults.prop --- @@ -0,0 +1,90 @@ +# The master URL for the cluster egspark://host:port, mesos://host:port, yarn, or local. +# legacy env variable MASTER +spark.master = local[*] + +# Should spark submit run in verbose mode: default is false +spark.verbose = false + +# Comma-separated list of files to be placed in the working directory of each executor +# spark.files = + +# Comma-separated list of .zip, .egg, or .py files to place on the PYTHONPATH for Python apps. +# spark.submit.pyfiles = + +# Comma-separated list of local jars to include on the driver and executor classpaths. +# spark.jars = + + +# Path to a bundled jar including your application and all dependencies. +# The URL must be globally visible inside of your cluster, for instance, +# an hdfs:// path or a file:// path that is present on all nodes. +# spark.app.primaryResource = + +# A name of your application. +spark.app.name = Unknown Application + +# Your application's main class (for Java / Scala apps). +# spark.app.class = --- End diff -- primaryResource, spark.app.name and spark.app,class have been removed from the file. I note in sparkSubmit: if (mainClass == null !isPython primaryResource != null) { .. attempt to derive class from primaryResource } Does that mean it is valid to not have a main class defined at this stage if we are running python? or should we error out if we still do not have a main class after this code, regardless if we are running python or not? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
Github user tigerquoll commented on a diff in the pull request: https://github.com/apache/spark/pull/2516#discussion_r18008765 --- Diff: core/src/main/scala/org/apache/spark/SparkConf.scala --- @@ -48,8 +50,10 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { private[spark] val settings = new HashMap[String, String]() if (loadDefaults) { +val sparkConfigs = SparkSubmitArguments.mergeSparkProperties(Vector.empty) // Load any spark.* system properties -for ((k, v) - System.getProperties.asScala if k.startsWith(spark.)) { +for ((k, v) - sparkConfigs --- End diff -- I've reverted changes in SparkConf to its original (pre PR) state until approval in principal is given for the concepts behind the PR in an attempt to make the PR more understandable. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
Github user tigerquoll commented on a diff in the pull request: https://github.com/apache/spark/pull/2516#discussion_r18009041 --- Diff: core/src/main/scala/org/apache/spark/deploy/MergedPropertyMap.scala --- @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy + +import scala.collection._ + + +object MergedPropertyMap { + + /** + * Flatten a map of maps out into a single map, later maps in the propList + * have priority over older ones + * @param propList Vector of property maps[PropName-PropValue] to merge + */ + def mergePropertyMaps( propList: Vector[Map[String, String]]): mutable.Map[String, String] = { --- End diff -- fixed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
Github user tigerquoll commented on a diff in the pull request: https://github.com/apache/spark/pull/2516#discussion_r18009050 --- Diff: core/src/main/scala/org/apache/spark/deploy/MergedPropertyMap.scala --- @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy + +import scala.collection._ + + +object MergedPropertyMap { + + /** + * Flatten a map of maps out into a single map, later maps in the propList + * have priority over older ones + * @param propList Vector of property maps[PropName-PropValue] to merge + */ + def mergePropertyMaps( propList: Vector[Map[String, String]]): mutable.Map[String, String] = { +val propMap = new mutable.HashMap[String, String]() +// loop through each entry of each map in order of priority +// and add it to our propMap +propList.foreach { + _.foreach{ case(k,v) = if (propMap.getOrElse(k,).size == 0) propMap.put(k,v)} --- End diff -- Thanks, missed that cleaner implementation. Changed to immutable Map and moved to utils --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
Github user tigerquoll commented on a diff in the pull request: https://github.com/apache/spark/pull/2516#discussion_r18009125 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala --- @@ -83,7 +88,7 @@ object SparkSubmit { * (4) the main class for the child */ private[spark] def createLaunchEnv(args: SparkSubmitArguments) - : (ArrayBuffer[String], ArrayBuffer[String], Map[String, String], String) = { + : (ArrayBuffer[String], ArrayBuffer[String], Map[String, String], String) = { --- End diff -- fixed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
Github user tigerquoll commented on a diff in the pull request: https://github.com/apache/spark/pull/2516#discussion_r18009140 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala --- @@ -130,7 +135,7 @@ object SparkSubmit { if (!Utils.classIsLoadable(org.apache.spark.deploy.yarn.Client) !Utils.isTesting) { printErrorAndExit( Could not load YARN classes. + - This copy of Spark may not have been compiled with YARN support.) +This copy of Spark may not have been compiled with YARN support.) --- End diff -- fixed up, like last one, not sure how they got mangled --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
Github user tigerquoll commented on a diff in the pull request: https://github.com/apache/spark/pull/2516#discussion_r18009318 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala --- @@ -288,11 +291,11 @@ object SparkSubmit { } private def launch( - childArgs: ArrayBuffer[String], - childClasspath: ArrayBuffer[String], - sysProps: Map[String, String], - childMainClass: String, - verbose: Boolean = false) { + childArgs: ArrayBuffer[String], --- End diff -- agreed very weird, reverted to original value --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org