[GitHub] spark issue #21308: [SPARK-24253][SQL] Add DeleteSupport mix-in for DataSour...

2018-09-10 Thread tigerquoll
Github user tigerquoll commented on the issue:

https://github.com/apache/spark/pull/21308
  
@rdblue when you say "you don't think the API proposed here needs to 
support a first-class partition concept", are you referring to the 
"DeleteSupport" Interface, or to DataSourceV2 in general?
If you are referring to DeleteSupport, then do you have the same objections 
to a separate "DropPartition"/"AddPartition" interface?  
If you mean that you don't think DataSourceV2 requires supporting 
partitions as a first-class concept, then how are users of spark supposed to 
perform operations like 
1. adding,  
2. altering,
3. removing, and
4. listing
partitions on those data-sources that are represented by particular 
instances of DatasourceV2?  


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21308: [SPARK-24253][SQL] Add DeleteSupport mix-in for DataSour...

2018-09-06 Thread tigerquoll
Github user tigerquoll commented on the issue:

https://github.com/apache/spark/pull/21308
  
@rdblue I think our debate is whether we should expose an API to represent 
direct operations on partitions in the new datasource api. 



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21308: [SPARK-24253][SQL] Add DeleteSupport mix-in for DataSour...

2018-09-06 Thread tigerquoll
Github user tigerquoll commented on the issue:

https://github.com/apache/spark/pull/21308
  
@rdblue Actually:  https://issues.apache.org/jira/browse/SPARK-22389.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21308: [SPARK-24253][SQL] Add DeleteSupport mix-in for DataSour...

2018-09-06 Thread tigerquoll
Github user tigerquoll commented on the issue:

https://github.com/apache/spark/pull/21308
  
@rdblue what about those data sources that support record deletion and 
partition dropping as two semantically different operations - Kudu and Hbase 
being two examples. 

All systems that support partitions have a different api for dealing with 
partition level ops. Even file based table storage systems support the 
different levels of manipulation. (look at the sql DDL that impala supports for 
parquet partition for an example - they use a filter, but the command is 
“this partition op applies to the partiton that is defined by this filter”, 
not “apply this op to all records that match this filter)”

The difference is subtle, but it is an important one, and every system that 
supports partitions enforces that difference for a reason.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21306: [SPARK-24252][SQL] Add catalog registration and table ca...

2018-09-04 Thread tigerquoll
Github user tigerquoll commented on the issue:

https://github.com/apache/spark/pull/21306
  
So Kudu range partitions support arbitrary sized partition intervals, like 
the example below, where the first and last range partition are six months in 
size, but the middle partition is one year in size.  

-- Make a table representing a date/time value as TIMESTAMP.
-- The strings representing the partition bounds are automatically
-- cast to TIMESTAMP values.
create table native_timestamp(id bigint, when_exactly timestamp, event 
string, primary key (id, when_exactly))
  range (when_exactly)
  (
partition '2015-06-01' <= values < '2016-01-01',
partition '2016-01-01' <= values < '2017-01-01',
partition '2017-01-01' <= values < '2017-06-01'
  )
  stored as kudu;


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21306: [SPARK-24252][SQL] Add catalog registration and table ca...

2018-09-04 Thread tigerquoll
Github user tigerquoll commented on the issue:

https://github.com/apache/spark/pull/21306
  
Sure,
I am looking at the point of view of supporting Kudu.  Check out 
https://kudu.apache.org/docs/schema_design.html#partitioning for some of the 
details.  In particular 
https://kudu.apache.org/2016/08/23/new-range-partitioning-features.html.
As kudu is a column store, each column also has attributes associated with 
it such as encoding and compression codecs.
Apache Kudu - Apache Kudu Schema 
Design<https://kudu.apache.org/docs/schema_design.html#partitioning>
A new open source Apache Hadoop ecosystem project, Apache Kudu completes 
Hadoop's storage layer to enable fast analytics on fast data
kudu.apache.org


I really think that partitions should be considered part of the table 
schema.  They have an existence above and beyond the definition of a filter 
that matches a record.  Adding an empty partition changes the state of many 
underlying systems.  Many systems that support partitions also have APIs for 
adding and removing partition definitions, some systems require partition 
information to be specified during table creation.  Those systems that support 
changing partitions after creation usually have specific for adding and 
removing partitions.


Dale,


From: Ryan Blue 
Sent: Tuesday, 4 September 2018 4:20 PM
To: apache/spark
    Cc: tigerquoll; Comment
Subject: Re: [apache/spark] [SPARK-24252][SQL] Add catalog registration and 
table catalog APIs. (#21306)


Can we support column range partition predicates please?

This has an "apply" transform for passing other functions directly through, 
so that may help if you have additional transforms that aren't committed to 
Spark yet.

As for range partitioning, can you be more specific about what you mean? 
What does that transform function look like? Part of the rationale for the 
existing proposal is that these are all widely used and understood. I want to 
make sure that as we expand the set of validated transforms, we aren't 
introducing confusion.

Also, could you share the use case you intend for this? It would be great 
to hear about uses other than just Iceberg tables.

—
You are receiving this because you commented.
Reply to this email directly, view it on 
GitHub<https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fspark%2Fpull%2F21306%23issuecomment-418430089=02%7C01%7C%7C335b27fc36b2449d1ac208d612824fa2%7C84df9e7fe9f640afb435%7C1%7C0%7C636716748222067761=yWzFakaWAq5yhYAo%2FuBoFkIXpP9hoh9f1N6xm3XcQOs%3D=0>,
 or mute the 
thread<https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fnotifications%2Funsubscribe-auth%2FAH9Fuh3RPZ-hTd5T3e92TX-xmiPHEGv5ks5uXqhEgaJpZM4T8FJh=02%7C01%7C%7C335b27fc36b2449d1ac208d612824fa2%7C84df9e7fe9f640afb435%7C1%7C0%7C636716748222067761=wJSnYO69FKZ8ZHbqGNrxxGsjC1W0rR7NIWOAE0EqXTA%3D=0>.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21308: [SPARK-24253][SQL] Add DeleteSupport mix-in for DataSour...

2018-09-04 Thread tigerquoll
Github user tigerquoll commented on the issue:

https://github.com/apache/spark/pull/21308
  
I am assuming this API was intended to support the "drop partition" 
use-case.  I'm arguing that adding and deleting partitions deal with a concept 
that is a slightly higher concept than just a bunch of records that match a 
filter.  Backing up this fact is the concept that partitions are defined 
independently of any records they may or may not contain - You can add an empty 
partition and the underlying state of the system will change.

Also - as an end user I would be very upset if I meant to drop a partition, 
but because of a transcription error accidentally started a delete process with 
a filter that didn't directly match a partition definition that takes a million 
times as long to execute.  

Partitions are an implementation optimisation that has leaked into higher 
level APIs because they are an extremely useful and performant implementation 
optimisation.  I am wondering if we should represent them in this API as 
something slightly more higher level then just a filter definition.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21306: [SPARK-24252][SQL] Add catalog registration and t...

2018-09-04 Thread tigerquoll
Github user tigerquoll commented on a diff in the pull request:

https://github.com/apache/spark/pull/21306#discussion_r214808418
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/TableChange.java ---
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog.v2;
+
+import org.apache.spark.sql.types.DataType;
+
+/**
+ * TableChange subclasses represent requested changes to a table. These 
are passed to
+ * {@link TableCatalog#alterTable}. For example,
+ * 
+ *   import TableChange._
+ *   val catalog = source.asInstanceOf[TableSupport].catalog()
+ *   catalog.alterTable(ident,
+ *   addColumn("x", IntegerType),
+ *   renameColumn("a", "b"),
+ *   deleteColumn("c")
+ * )
+ * 
+ */
+public interface TableChange {
--- End diff --

is adding or dropping table partitions a table change?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21306: [SPARK-24252][SQL] Add catalog registration and t...

2018-09-04 Thread tigerquoll
Github user tigerquoll commented on a diff in the pull request:

https://github.com/apache/spark/pull/21306#discussion_r214806854
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/TableChange.java ---
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog.v2;
+
+import org.apache.spark.sql.types.DataType;
+
+/**
+ * TableChange subclasses represent requested changes to a table. These 
are passed to
+ * {@link TableCatalog#alterTable}. For example,
+ * 
+ *   import TableChange._
+ *   val catalog = source.asInstanceOf[TableSupport].catalog()
+ *   catalog.alterTable(ident,
+ *   addColumn("x", IntegerType),
+ *   renameColumn("a", "b"),
+ *   deleteColumn("c")
+ * )
+ * 
+ */
+public interface TableChange {
--- End diff --

Can we support adding a comment to a column? / table?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21306: [SPARK-24252][SQL] Add catalog registration and table ca...

2018-09-04 Thread tigerquoll
Github user tigerquoll commented on the issue:

https://github.com/apache/spark/pull/21306
  
Can we support column range partition predicates please?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [CORE] [SPARK-6593] Provide option for HadoopR...

2015-06-29 Thread tigerquoll
Github user tigerquoll commented on the pull request:

https://github.com/apache/spark/pull/5250#issuecomment-116651383
  
Correct.
Cheers,Dale.

Date: Fri, 19 Jun 2015 06:01:26 -0700
From: notificati...@github.com
To: sp...@noreply.github.com
CC: tigerqu...@outlook.com
Subject: Re: [spark] [CORE] [SPARK-6593] Provide option for HadoopRDD to 
skip corrupted files (#5250)

I think this should be closed now in favor of #5368 right?


—
Reply to this email directly or view it on GitHub.

  


  
  
  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [CORE] [SPARK-6593] Provide option for HadoopR...

2015-06-29 Thread tigerquoll
Github user tigerquoll closed the pull request at:

https://github.com/apache/spark/pull/5250


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6214][CORE] - A simple expression langu...

2015-05-05 Thread tigerquoll
Github user tigerquoll closed the pull request at:

https://github.com/apache/spark/pull/4937


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6214][CORE] - A simple expression langu...

2015-05-04 Thread tigerquoll
Github user tigerquoll commented on the pull request:

https://github.com/apache/spark/pull/4937#issuecomment-98918915
  
Ok, no problems.
I can understand why.

Regards,
Dale.

Sent from my iPad

 On 4 May 2015, at 9:48 pm, Sean Owen notificati...@github.com wrote:
 
 Do you mind closing this PR, simply because I don't think it's going to 
proceed in Spark? It will not disappear but still be here for reference, and as 
I say I do think it's interesting enough to merit its own project.
 
 —
 Reply to this email directly or view it on GitHub.
 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [CORE] [SPARK-6593] Provide a HadoopRDD varian...

2015-04-16 Thread tigerquoll
Github user tigerquoll commented on the pull request:

https://github.com/apache/spark/pull/5368#issuecomment-93705438
  
Sounds like a reasonable idea, I'll put another PR together.
Regards,Dale.

Date: Wed, 15 Apr 2015 10:25:26 -0700
From: notificati...@github.com
To: sp...@noreply.github.com
CC: tigerqu...@outlook.com
Subject: Re: [spark] [CORE] [SPARK-6593] Provide a HadoopRDD variant that 
wraps all reads in a Try (#5368)

An RDD that handles read exceptions with Try does seem useful to me - I've 
seen a decent amount of boilerplate used to accomplish the same thing.  Is 
there a reason this is HadoopRDD-specific?  I.e. why not provide something like 
a TryRDD that can wrap any RDD doing a risky operation?


—
Reply to this email directly or view it on GitHub.

  


  
  
  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [CORE] [SPARK-6593] Provide a HadoopRDD varian...

2015-04-06 Thread tigerquoll
Github user tigerquoll commented on the pull request:

https://github.com/apache/spark/pull/5368#issuecomment-89942497
  
The name of the RDD is a placeholder.  I'm happy to accept any suggestions 
you have for a name that makes more sense to you.  The InputFormat based 
solution you mention implies swallowing exceptions and continuing without a 
trace - something you argued against in the previous PR 
(https://github.com/apache/spark/pull/5250) related to this issue.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [CORE] [SPARK-6593] Provide a HadoopRDD varian...

2015-04-05 Thread tigerquoll
GitHub user tigerquoll opened a pull request:

https://github.com/apache/spark/pull/5368

[CORE] [SPARK-6593] Provide a HadoopRDD variant that wraps all reads in a 
Try

@rxin @sryza  
As per attached jira ticket, I was proposing some way of making hadoop IO 
more robust in response to malformated files that cause an exception in the 
hadoop input libraries.  The previous PR (#5250) simply added an option to 
ignore exceptions and continue processing.  This raised some concerns about not 
giving any indication to the user that an error had occurred (other than a 
warning message in the logs.)

As an alternative to the first PR, and to keep the conversation moving, I'm 
putting forward this PR, which contains HadoopReliableRDD, which wraps all 
Hadoop IO in a Try structure and passes that back to the user so that the user 
is forced to act upon any non-fatal errors that occur when reading from an 
Hadoop file system using this API. 

An example of using this API is given below:

```
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapred.TextInputFormat
import scala.util.{Failure, Success, Try}

// hdfs directory contains test[1.4].txt.gz  - test4.txt.gz is corrupted
val path = hdfs:///user/cloudera/*.gz

val testRdd = sc.hadoopReliableFile(path, classOf[TextInputFormat], 
classOf[LongWritable], classOf[Text],2)
…
15/04/05 21:24:57 INFO rdd.HadoopRDD: Input split: 
hdfs://quickstart.cloudera:8020/user/cloudera/test4.txt.gz:0+42544
15/04/05 21:24:57 INFO rdd.HadoopRDD: Input split: 
hdfs://quickstart.cloudera:8020/user/cloudera/test3.txt.gz:0+15043
15/04/05 21:24:57 INFO rdd.HadoopRDD: Input split: 
hdfs://quickstart.cloudera:8020/user/cloudera/test1.txt.gz:0+15043
15/04/05 21:24:57 INFO rdd.HadoopRDD: Input split: 
hdfs://quickstart.cloudera:8020/user/cloudera/test2.txt.gz:0+15043
…
15/04/05 21:24:57 WARN rdd.HadoopReliableRDD: Exception on read attempt 
IOException - not a gzip file - stopping any further read attempts from this 
split
…

testRdd.count 
…
res5: Long = 4384

testRdd.filter(x=x.isSuccess).count
...
res6: Long = 4383

testRdd.map( _ match {
  case Failure(ex) = s${ex.getClass.getSimpleName}: ${ex.getMessage}
  case Success(_) = success
}).countByValue()
..
res7: scala.collection.Map[String,Long] = Map(IOException: not a gzip file 
- 1, success - 4383)


testRdd.filter(_.isSuccess).map(_.get.toString).flatMap(_.split( 
)).countByValue
...
… and on we go…

```

Just to emphasise what we are doing here.  We are trapping exceptions that 
previously would have stopped Spark from executing, and continuing to process 
data in other tasks rather than terminating the entire application which is 
what previously happened .  If the exceptions were being raised in a 
non-deterministic way (eg temporary network failure) then the produced data 
could be different if the code is run a second time.  If we didn't handle these 
errors, then the entire application will currently fail in a non-deterministic 
way.  A lot of times the later behavior is the desirable response, but 
sometimes it is not.

If the exceptions were being raised in a deterministic way (eg file was 
corrupt before it was copied into HDFS), then the data will be produced in a 
deterministic way that can be repeated with the same results.  

I believe giving users some way to process data that Spark currently 
crashes on is worth considering, what ever form it finally takes.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tigerquoll/spark master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/5368.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5368


commit bffc68e64098fe24464dfe0eca0eba15afef85c8
Author: Dale tigerqu...@outlook.com
Date:   2015-04-04T11:43:03Z

[SPARK-6593] Refactored HadoopRDD so that the iterator could be cleanly 
wrapped

commit fd75bc1dae5b707bcdd914d4bca852b2594daa60
Author: Dale tigerqu...@outlook.com
Date:   2015-04-04T11:44:09Z

[SPARK-6593] Added HadoopReliableRDD, which returns all read actions in a 
wrapped in a Try

commit d2ab7044516ebda33ceeca70bb6aa9c217af42ca
Author: Dale tigerqu...@outlook.com
Date:   2015-04-05T12:14:41Z

[SPARK-6593] Fine-tuned the iterator wrapper logic

commit cb88164bce0552617b5dbdfdff0f02fe018d84d5
Author: Dale tigerqu...@outlook.com
Date:   2015-04-05T12:15:48Z

[SPARK-6593] Added convenience method hadoopReliableFile() to Scala Context




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have

[GitHub] spark pull request: [CORE] [SPARK-6593] Provide option for HadoopR...

2015-03-30 Thread tigerquoll
Github user tigerquoll commented on a diff in the pull request:

https://github.com/apache/spark/pull/5250#discussion_r27377569
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala ---
@@ -246,6 +249,15 @@ class HadoopRDD[K, V](
 } catch {
   case eof: EOFException =
 finished = true
+  case e: Exception =
--- End diff --

Having been on the receiving end of things I know that the gzip module 
throws an IOException, but unfortunately I have no knowledge over what the 
Hadoop input modules and what exceptions they throw, or if they propagate 
exceptions up from other 3rd party libraries.   Catching such a broad exception 
is mitigated by the fact that this particular option defaults to off, and 
should only be enabled when you are trying to parse files that you know are 
corrupt.  Given the situation, when you turn the option on, we should really 
try to finish processing files to the best of our ability, thus I think in this 
case catching 'Exception' might be appropriate.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [CORE] [SPARK-6593] Provide option for HadoopR...

2015-03-30 Thread tigerquoll
Github user tigerquoll commented on the pull request:

https://github.com/apache/spark/pull/5250#issuecomment-87681300
  
If a user can write scala codes that appropriately deals with the problem, 
why can't they write spark code to deal with it in parallel? Isn't this what 
spark is about? Isn't this a problem that can be readily parallelised? Spark is 
being put forward as data processing framework - bad data needs to be handled 
in some way better then just refusing to have anything to do with it.

I believe to parallelise your mentioned solution means adding to the public 
API, which takes time and consideration.  The option was considered as a 
scoped, quick fix solution to at least give users some ability to continue - 
the idea would be to retire the option once a new API was in place to 
gracefully deal with the problem.

In regards to the option being presented to the users as a fine thing to 
do when I don't believe it is - how about providing the information to the 
user a letting the users chose themselves? A good point about an option being a 
public API though - what is the understanding about how stable options are? No 
real Experimental or DeveloperAPI tags available here.

Your proposed solution was the same solution I ended up settling on when 
first confronted with the issue - but only after a number of frustrated 
attempts at getting spark to do what I wanted it to.  What you proposed and 
what I did In the end was to give up using spark and to bashing out some 
standalone code using hadoop libraries to do the job.  ie: Stopped using spark 
and used another tool that made my job easier.  I felt that it didn't have to 
be this way.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [CORE] [SPARK-6593] Provide option for HadoopR...

2015-03-30 Thread tigerquoll
Github user tigerquoll commented on the pull request:

https://github.com/apache/spark/pull/5250#issuecomment-87650287
  
Hi Sean, 
Thanks for your input - your views have helped me refine my thinking on the 
matter.  I believe that If you take a purist's point of view then yes you can 
say the source of the problem (likely) is with the data producer and should be 
fixed at the data producer's end. 

The point being is that this is a problem that is affecting many spark 
users right now, and many users are not in control of the source system of the 
data they are analysing and are forced to 'make do' with what they have.  You 
call this solution a band-aid - but many ETL solutions are a bandaid - but 
providing this functionality is useful and serves a purpose for the end-user.

Are you concerned that swallowing an exception could leave the hadoop input 
libraries in an inconsistent state, causing more data corruption?  This will 
not happen because swallowing the exception triggers the immediate finish of 
the file reading task and no more data will be read by the task.

Are you concerned that swallowing an exception indicates that something has 
potentially gone wrong earlier in the hadoop input read, and that previous data 
could have been corrupted?  The user already knows this is potentially the case 
because running the application without this option enabled has caused the 
application to terminate in the first place.

The fact that we are being more permissive of potentially corrupt data is a 
show stopper for this being default behaviour - but I'm not proposing this be 
default behaviour, I'm proposing this be a last-ditch option that an advanced 
user can knowingly enable when attempting to deal with corrupted data, with the 
understanding that their data could be made worse, but most likely corrupt data 
will be omitted. 

The alternative is to tell them that their data is not suitable for being 
loaded into spark and perhaps they should use another tool or tell the data 
system owner to fix their data feeds and get back to them with another data set 
some time in the future.  I know which option I would prefer if given the 
choice - don't let perfect be the enemy of good.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [CORE] [SPARK-6593] Provide option for HadoopR...

2015-03-30 Thread tigerquoll
Github user tigerquoll commented on the pull request:

https://github.com/apache/spark/pull/5250#issuecomment-87822606
  
To sum up the debate so far there doesn't appear to be any major concerns 
about leaving the system in an inconsistent state.  The major concern seems to 
be about swallowing of the exception and not letting any sign of it propagate 
back up to the main flow of execution, with the additional potential risk that 
the fact we've done so could lead to non-deterministic results.

By swallowing non-deterministic exceptions, we are explicitly converting 
non-deterministic application crashing to potentially non-deterministic data - 
no getting around this fact.

I'd like to achieve an outcome here, and I'm not really wedded to the means 
that outcome is achieved. As a counter-proposal, how about a new HadoopRDD 
derivative that works with Try((K,V))?  We propagate the exception cleanly back 
to calling code and allow them to deal with it explicitly.  it is slightly more 
cumbersome to use, but you'd only use it when you are expecting corrupted data 
back, and this allows an easy manipulation of data that contains potential 
exceptions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [CORE] [SPARK-6593] Provide option for HadoopR...

2015-03-30 Thread tigerquoll
Github user tigerquoll commented on the pull request:

https://github.com/apache/spark/pull/5250#issuecomment-87825625
  
I'd Also like to point out the converse of my second paragraph above - we 
are also giving a means of converting deterministic application crashing to 
deterministic data.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [CORE] [SPARK-6593] Provide option for HadoopR...

2015-03-29 Thread tigerquoll
GitHub user tigerquoll opened a pull request:

https://github.com/apache/spark/pull/5250

[CORE] [SPARK-6593] Provide option for HadoopRDD to skip bad data splits

When reading a large amount of files from HDFS eg. with 
sc.textFile(hdfs:///user/cloudera/logs*.gz). If a single split is corrupted 
then the entire job is canceled. As default behaviour this is probably for the 
best, but it would be nice in some circumstances where you know it will be ok 
to have the option to skip the corrupted portion and continue the job.

Ideally I'd like to be able to report the list of bad files directly back 
to the master, but that would involve a public API change and is a discussion 
for later.  For now, I propose a new option of 
'spark.hadoop.ignoreInputErrors', which defaults to false to maintain the 
existing behaviour of terminating the job upon any exception from the hadoop 
input libraries.  when set to true however, this option will simply log the 
error to the executor's log, skip the input-split that caused the error and 
continue processing.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tigerquoll/spark SPARK-6593a

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/5250.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5250


commit 37d2cbde5b67c94cac26e2c55030e9bb00678d7a
Author: Dale tigerqu...@outlook.com
Date:   2015-03-29T11:23:01Z

[SPARK-6593] Added spark.hadoop.ignoreInputEerrors config option and 
implementation




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Spark-6214][CORE] - A simple expression langu...

2015-03-07 Thread tigerquoll
GitHub user tigerquoll opened a pull request:

https://github.com/apache/spark/pull/4937

[Spark-6214][CORE]  - A simple expression language to specify configuration 
options

This is a proposal to allow for configuration options to be specified via a 
simple expression language.  This language would have the following features:

* Allow for basic arithmetic (+-/*) with support bracketed expressions and 
standard precedence rules.

* Support for and normalisation of common units of reference eg. MB, GB, 
seconds,minutes,hours, days and weeks.

* Allow for the referencing of basic environmental information currently 
defined as:
numCores: Number of cores assigned to the JVM

physicalMemoryBytes:  Memory size of hosting machine

JVMTotalMemoryBytes:  current bytes of memory allocated to the JVM

JVMMaxMemoryBytes:Maximum number of bytes of memory available to the 
JVM

JVMFreeMemoryBytes:   maxMemoryBytes - totalMemoryBytes

* Allow for the limited referencing of other configuration values when 
specifying values. (Other configuration values must be initialised and 
explicitly passed into the expression evaluator for this functionality to be 
enabled).
 
Such a feature would have the following end-user benefits:

* Allow for the flexibility in specifying time intervals or byte quantities 
in appropriate and easy to follow units  e.g. 1 week rather rather then 604800 
seconds

* Have a consistent means of entering configuration information regardless 
of the configuration option being added. (eg questions such as ‘is the 
particular option specified in ms or seconds?’ become irrelevant, because the 
user can pick what ever unit makes sense for the magnitude of the value they 
are specifying)

* Allow for the scaling of a configuration option in relation to a system 
attributes. e.g.
SPARK_WORKER_CORES = numCores - 1
SPARK_WORKER_MEMORY = physicalMemoryBytes - 1.5 GB

* Being able to scale multiple configuration options together eg:
spark.driver.memory = 0.75 * physicalMemoryBytes
spark.driver.maxResultSize = spark.driver.memory * 0.8

This PR only contains the implementation of the expression language and 
associated unit tests (more then 120 unit tests).  If this PR is accepted, the 
idea is that moving options over to use this expression language would be done 
in one or more follow-up PRs.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tigerquoll/spark SPARK-6214

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/4937.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4937


commit 49e981005ae736b879f6a4330ec17d2828c74400
Author: Dale tigerqu...@outlook.com
Date:   2015-02-26T22:53:46Z

Initial checkin of basic expression parser

commit c6f137a0233aace1a1b6221c9ea3b89a4c8929dc
Author: Dale tigerqu...@outlook.com
Date:   2015-03-01T06:02:30Z

WIP for ByteExpressionParser

commit 07b8b00570c70b474adde81477461bd06658237d
Author: Dale tigerqu...@outlook.com
Date:   2015-03-01T12:41:12Z

WIP for TimeExpressionParser

commit ae710cdb40d6e28ecee8e2add1d91748920cc182
Author: Dale tigerqu...@outlook.com
Date:   2015-03-05T21:40:48Z

Code tidy up, added some comments, created factory object

commit 6a32ef9b812259ed42c047332d0f23c68d445531
Author: Dale tigerqu...@outlook.com
Date:   2015-03-07T06:25:52Z

Added Physical Memory function, more unit tests for quantity objects

commit 5867704a3abcc23cd3f3c56a4de2b1df26390877
Author: Dale tigerqu...@outlook.com
Date:   2015-03-07T06:26:29Z

Added more unit tests to increase code coverage

commit 5db9ef08e398c371a4dd65a0e2d6c0f5c5b4a6ef
Author: Dale tigerqu...@outlook.com
Date:   2015-03-07T06:27:21Z

Refactored code

commit 6795a8d70f96c22c229447ee727787389079a474
Author: Dale tigerqu...@outlook.com
Date:   2015-03-07T06:45:56Z

lint checks now pass

commit c6cb7f13eed00d702c3d29d8e6bbd53f4acf35cb
Author: Dale tigerqu...@outlook.com
Date:   2015-03-07T06:49:27Z

Quick import tidyup

commit ede092d1f8b1049bb37a0aa022a24c6e0e14b39d
Author: Dale tigerqu...@outlook.com
Date:   2015-03-07T09:38:44Z

Moved cpuCores to MachineInfoFunctions trait

commit 5fd80e9828a2808f73ed72580827b37ffd69dc82
Author: Dale tigerqu...@outlook.com
Date:   2015-03-07T09:57:11Z

Updated comments




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA

[GitHub] spark pull request: spark-core - [SPARK-4787] - Stop sparkcontext ...

2014-12-29 Thread tigerquoll
Github user tigerquoll commented on a diff in the pull request:

https://github.com/apache/spark/pull/3809#discussion_r22334486
  
--- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
@@ -329,8 +329,11 @@ class SparkContext(config: SparkConf) extends Logging 
with ExecutorAllocationCli
   try {
 dagScheduler = new DAGScheduler(this)
   } catch {
-case e: Exception = throw
-  new SparkException(DAGScheduler cannot be initialized due to 
%s.format(e.getMessage))
+case e: Exception = {
+  stop()
--- End diff --

Excellent idea Josh.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: spark-core - [SPARK-4787] - Stop sparkcontext ...

2014-12-26 Thread tigerquoll
GitHub user tigerquoll opened a pull request:

https://github.com/apache/spark/pull/3809

spark-core - [SPARK-4787] - Stop sparkcontext properly if a DAGScheduler 
init error occurs

[SPARK-4787] Stop SparkContext properly if an exception occurs during 
DAGscheduler initialization.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tigerquoll/spark SPARK-4787

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/3809.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3809


commit 217257879fe7c98673caf14b980790498887581e
Author: Dale tigerqu...@outlook.com
Date:   2014-12-26T09:33:05Z

[SPARK-4787] Stop context properly if an exception occurs during 
DAGScheduler initialization.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4567. Make SparkJobInfo and SparkStageIn...

2014-12-26 Thread tigerquoll
Github user tigerquoll commented on the pull request:

https://github.com/apache/spark/pull/3426#issuecomment-68165724
  
Heh @JoshRosen @sryza  , should this patch include a serialVersionUID 
attribute on the classes to be serialized to make sure compiler quirks don't 
cause different UIDs to be generated for the classes?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4567. Make SparkJobInfo and SparkStageIn...

2014-12-26 Thread tigerquoll
Github user tigerquoll commented on the pull request:

https://github.com/apache/spark/pull/3426#issuecomment-68166415
  

http://stackoverflow.com/questions/285793/what-is-a-serialversionuid-and-why-should-i-use-it
 seems to be a good summary of the pros and cons of this approach


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...

2014-12-14 Thread tigerquoll
Github user tigerquoll commented on the pull request:

https://github.com/apache/spark/pull/2516#issuecomment-66909268
  
No probs, it was actually a nice way of starting to poke through the code 
to figure out how things are put together.  I'll stick to smaller jobs from now 
on in.
Regards,Dale.

Date: Tue, 9 Dec 2014 19:11:25 -0800
From: notificati...@github.com
To: sp...@noreply.github.com
CC: tigerqu...@outlook.com
Subject: Re: [spark] Spark Core - [SPARK-3620] - Refactor of SparkSubmit 
Argument parsing code (#2516)

Hey @tigerquoll usually for large patches like this we require a design doc 
on the JIRA. Especially because the priority of this is not super important, I 
would recommend that we close this issue for now, and maybe open a new one 
later once there is a consensus on how we should restructure Spark submit. 
Thanks for your work so far.


—
Reply to this email directly or view it on GitHub.

  


  
  
  =


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...

2014-11-04 Thread tigerquoll
Github user tigerquoll commented on a diff in the pull request:

https://github.com/apache/spark/pull/2516#discussion_r19793754
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala ---
@@ -83,216 +79,163 @@ object SparkSubmit {
*   (4) the main class for the child
*/
   private[spark] def createLaunchEnv(args: SparkSubmitArguments)
-  : (ArrayBuffer[String], ArrayBuffer[String], Map[String, String], 
String) = {
+  : (mutable.ArrayBuffer[String], mutable.ArrayBuffer[String], 
Map[String, String], String) = {
 
 // Values to return
-val childArgs = new ArrayBuffer[String]()
-val childClasspath = new ArrayBuffer[String]()
-val sysProps = new HashMap[String, String]()
+val childArgs = new mutable.ArrayBuffer[String]()
+val childClasspath = new mutable.ArrayBuffer[String]()
+val sysProps = new mutable.HashMap[String, String]()
 var childMainClass = 
 
-// Set the cluster manager
-val clusterManager: Int = args.master match {
-  case m if m.startsWith(yarn) = YARN
-  case m if m.startsWith(spark) = STANDALONE
-  case m if m.startsWith(mesos) = MESOS
-  case m if m.startsWith(local) = LOCAL
-  case _ = printErrorAndExit(Master must start with yarn, spark, 
mesos, or local); -1
-}
-
-// Set the deploy mode; default is client mode
-var deployMode: Int = args.deployMode match {
-  case client | null = CLIENT
-  case cluster = CLUSTER
-  case _ = printErrorAndExit(Deploy mode must be either client or 
cluster); -1
-}
-
-// Because yarn-cluster and yarn-client encapsulate both the master
-// and deploy mode, we have some logic to infer the master and deploy 
mode
-// from each other if only one is specified, or exit early if they are 
at odds.
-if (clusterManager == YARN) {
-  if (args.master == yarn-standalone) {
-printWarning(\yarn-standalone\ is deprecated. Use 
\yarn-cluster\ instead.)
-args.master = yarn-cluster
-  }
-  (args.master, args.deployMode) match {
-case (yarn-cluster, null) =
-  deployMode = CLUSTER
-case (yarn-cluster, client) =
-  printErrorAndExit(Client deploy mode is not compatible with 
master \yarn-cluster\)
-case (yarn-client, cluster) =
-  printErrorAndExit(Cluster deploy mode is not compatible with 
master \yarn-client\)
-case (_, mode) =
-  args.master = yarn- + Option(mode).getOrElse(client)
-  }
-
+if (args.clusterManagerFlag == CM_YARN) {
   // Make sure YARN is included in our build if we're trying to use it
   if (!Utils.classIsLoadable(org.apache.spark.deploy.yarn.Client)  
!Utils.isTesting) {
 printErrorAndExit(
   Could not load YARN classes.  +
   This copy of Spark may not have been compiled with YARN 
support.)
   }
-}
-
-// The following modes are not supported or applicable
-(clusterManager, deployMode) match {
-  case (MESOS, CLUSTER) =
-printErrorAndExit(Cluster deploy mode is currently not supported 
for Mesos clusters.)
-  case (_, CLUSTER) if args.isPython =
-printErrorAndExit(Cluster deploy mode is currently not supported 
for python applications.)
-  case (_, CLUSTER) if isShell(args.primaryResource) =
-printErrorAndExit(Cluster deploy mode is not applicable to Spark 
shells.)
-  case _ =
+  val hasHadoopEnv = sys.env.contains(HADOOP_CONF_DIR) || 
sys.env.contains(YARN_CONF_DIR)
+  if (!hasHadoopEnv  !Utils.isTesting) {
+throw new Exception(When running with master ' + args.master + 
' +
+  either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the 
environment.)
+  }
 }
 
 // If we're running a python app, set the main class to our specific 
python runner
 if (args.isPython) {
   if (args.primaryResource == PYSPARK_SHELL) {
-args.mainClass = py4j.GatewayServer
-args.childArgs = ArrayBuffer(--die-on-broken-pipe, 0)
+args.mainClass = PY4J_GATEWAYSERVER
+args.childArgs = mutable.ArrayBuffer(--die-on-broken-pipe, 0)
   } else {
 // If a python file is provided, add it to the child arguments and 
list of files to deploy.
 // Usage: PythonAppRunner main python file extra python files 
[app arguments]
-args.mainClass = org.apache.spark.deploy.PythonRunner
-args.childArgs = ArrayBuffer(args.primaryResource, args.pyFiles) 
++ args.childArgs
-args.files = mergeFileLists(args.files, args.primaryResource)
+args.mainClass = PYTHON_RUNNER

[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...

2014-11-04 Thread tigerquoll
Github user tigerquoll commented on a diff in the pull request:

https://github.com/apache/spark/pull/2516#discussion_r19793870
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala ---
@@ -33,30 +34,25 @@ import org.apache.spark.util.Utils
  * a layer over the different cluster managers and deploy modes that Spark 
supports.
  */
 object SparkSubmit {
-
-  // Cluster managers
-  private val YARN = 1
-  private val STANDALONE = 2
-  private val MESOS = 4
-  private val LOCAL = 8
-  private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL
-
-  // Deploy modes
-  private val CLIENT = 1
-  private val CLUSTER = 2
-  private val ALL_DEPLOY_MODES = CLIENT | CLUSTER
-
   // A special jar name that indicates the class being run is inside of 
Spark itself, and therefore
   // no user jar is needed.
-  private val SPARK_INTERNAL = spark-internal
+  val SPARK_INTERNAL = spark-internal
 
   // Special primary resource names that represent shells rather than 
application jars.
-  private val SPARK_SHELL = spark-shell
-  private val PYSPARK_SHELL = pyspark-shell
+  val SPARK_SHELL = spark-shell
+  val PYSPARK_SHELL = pyspark-shell
+
+  // Special python classes
+  val PY4J_GATEWAYSERVER: String = py4j.GatewayServer
+  val PYTHON_RUNNER: String = org.apache.spark.deploy.PythonRunner
--- End diff --

done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...

2014-11-04 Thread tigerquoll
Github user tigerquoll commented on a diff in the pull request:

https://github.com/apache/spark/pull/2516#discussion_r19794246
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala ---
@@ -83,216 +79,163 @@ object SparkSubmit {
*   (4) the main class for the child
*/
   private[spark] def createLaunchEnv(args: SparkSubmitArguments)
-  : (ArrayBuffer[String], ArrayBuffer[String], Map[String, String], 
String) = {
+  : (mutable.ArrayBuffer[String], mutable.ArrayBuffer[String], 
Map[String, String], String) = {
 
 // Values to return
-val childArgs = new ArrayBuffer[String]()
-val childClasspath = new ArrayBuffer[String]()
-val sysProps = new HashMap[String, String]()
+val childArgs = new mutable.ArrayBuffer[String]()
+val childClasspath = new mutable.ArrayBuffer[String]()
+val sysProps = new mutable.HashMap[String, String]()
 var childMainClass = 
 
-// Set the cluster manager
-val clusterManager: Int = args.master match {
-  case m if m.startsWith(yarn) = YARN
-  case m if m.startsWith(spark) = STANDALONE
-  case m if m.startsWith(mesos) = MESOS
-  case m if m.startsWith(local) = LOCAL
-  case _ = printErrorAndExit(Master must start with yarn, spark, 
mesos, or local); -1
-}
-
-// Set the deploy mode; default is client mode
-var deployMode: Int = args.deployMode match {
-  case client | null = CLIENT
-  case cluster = CLUSTER
-  case _ = printErrorAndExit(Deploy mode must be either client or 
cluster); -1
-}
-
-// Because yarn-cluster and yarn-client encapsulate both the master
-// and deploy mode, we have some logic to infer the master and deploy 
mode
-// from each other if only one is specified, or exit early if they are 
at odds.
-if (clusterManager == YARN) {
-  if (args.master == yarn-standalone) {
-printWarning(\yarn-standalone\ is deprecated. Use 
\yarn-cluster\ instead.)
-args.master = yarn-cluster
-  }
-  (args.master, args.deployMode) match {
-case (yarn-cluster, null) =
-  deployMode = CLUSTER
-case (yarn-cluster, client) =
-  printErrorAndExit(Client deploy mode is not compatible with 
master \yarn-cluster\)
-case (yarn-client, cluster) =
-  printErrorAndExit(Cluster deploy mode is not compatible with 
master \yarn-client\)
-case (_, mode) =
-  args.master = yarn- + Option(mode).getOrElse(client)
-  }
-
+if (args.clusterManagerFlag == CM_YARN) {
   // Make sure YARN is included in our build if we're trying to use it
   if (!Utils.classIsLoadable(org.apache.spark.deploy.yarn.Client)  
!Utils.isTesting) {
 printErrorAndExit(
   Could not load YARN classes.  +
   This copy of Spark may not have been compiled with YARN 
support.)
   }
-}
-
-// The following modes are not supported or applicable
-(clusterManager, deployMode) match {
-  case (MESOS, CLUSTER) =
-printErrorAndExit(Cluster deploy mode is currently not supported 
for Mesos clusters.)
-  case (_, CLUSTER) if args.isPython =
-printErrorAndExit(Cluster deploy mode is currently not supported 
for python applications.)
-  case (_, CLUSTER) if isShell(args.primaryResource) =
-printErrorAndExit(Cluster deploy mode is not applicable to Spark 
shells.)
-  case _ =
+  val hasHadoopEnv = sys.env.contains(HADOOP_CONF_DIR) || 
sys.env.contains(YARN_CONF_DIR)
+  if (!hasHadoopEnv  !Utils.isTesting) {
+throw new Exception(When running with master ' + args.master + 
' +
+  either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the 
environment.)
+  }
 }
 
 // If we're running a python app, set the main class to our specific 
python runner
 if (args.isPython) {
   if (args.primaryResource == PYSPARK_SHELL) {
-args.mainClass = py4j.GatewayServer
-args.childArgs = ArrayBuffer(--die-on-broken-pipe, 0)
+args.mainClass = PY4J_GATEWAYSERVER
+args.childArgs = mutable.ArrayBuffer(--die-on-broken-pipe, 0)
   } else {
 // If a python file is provided, add it to the child arguments and 
list of files to deploy.
 // Usage: PythonAppRunner main python file extra python files 
[app arguments]
-args.mainClass = org.apache.spark.deploy.PythonRunner
-args.childArgs = ArrayBuffer(args.primaryResource, args.pyFiles) 
++ args.childArgs
-args.files = mergeFileLists(args.files, args.primaryResource)
+args.mainClass = PYTHON_RUNNER

[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...

2014-11-04 Thread tigerquoll
Github user tigerquoll commented on a diff in the pull request:

https://github.com/apache/spark/pull/2516#discussion_r19794638
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala ---
@@ -17,201 +17,286 @@
 
 package org.apache.spark.deploy
 
-import java.io.{File, FileInputStream, IOException}
-import java.util.Properties
+import java.io._
 import java.util.jar.JarFile
 
-import scala.collection.JavaConversions._
-import scala.collection.mutable.{ArrayBuffer, HashMap}
+import scala.collection._
 
 import org.apache.spark.SparkException
+import org.apache.spark.deploy.ConfigConstants._
 import org.apache.spark.util.Utils
+import org.apache.spark.deploy.SparkSubmitArguments._
 
 /**
- * Parses and encapsulates arguments from the spark-submit script.
- * The env argument is used for testing.
- */
-private[spark] class SparkSubmitArguments(args: Seq[String], env: 
Map[String, String] = sys.env) {
-  var master: String = null
-  var deployMode: String = null
-  var executorMemory: String = null
-  var executorCores: String = null
-  var totalExecutorCores: String = null
-  var propertiesFile: String = null
-  var driverMemory: String = null
-  var driverExtraClassPath: String = null
-  var driverExtraLibraryPath: String = null
-  var driverExtraJavaOptions: String = null
-  var driverCores: String = null
-  var supervise: Boolean = false
-  var queue: String = null
-  var numExecutors: String = null
-  var files: String = null
-  var archives: String = null
-  var mainClass: String = null
-  var primaryResource: String = null
-  var name: String = null
-  var childArgs: ArrayBuffer[String] = new ArrayBuffer[String]()
-  var jars: String = null
-  var verbose: Boolean = false
-  var isPython: Boolean = false
-  var pyFiles: String = null
-  val sparkProperties: HashMap[String, String] = new HashMap[String, 
String]()
-
-  /** Default properties present in the currently defined defaults file. */
-  lazy val defaultSparkProperties: HashMap[String, String] = {
-val defaultProperties = new HashMap[String, String]()
-if (verbose) SparkSubmit.printStream.println(sUsing properties file: 
$propertiesFile)
-Option(propertiesFile).foreach { filename =
-  val file = new File(filename)
-  SparkSubmitArguments.getPropertiesFromFile(file).foreach { case (k, 
v) =
-if (k.startsWith(spark)) {
-  defaultProperties(k) = v
-  if (verbose) SparkSubmit.printStream.println(sAdding default 
property: $k=$v)
-} else {
-  SparkSubmit.printWarning(sIgnoring non-spark config property: 
$k=$v)
-}
-  }
-}
-defaultProperties
+ * Pulls and validates configuration information together in order of 
priority
+ *
+ * Entries in the conf Map will be filled in the following priority order
+ * 1. entries specified on the command line (except from --conf entries)
+ * 2. Entries specified on the command line with --conf
+ * 3. Legacy environment variables
+ * 4  SPARK_DEFAULT_CONF/spark-defaults.conf or 
SPARK_HOME/conf/spark-defaults.conf if either exist
+ * 5. hard coded defaults
+ *
+*/
+private[spark] class SparkSubmitArguments(args: Seq[String]) {
+  /**
+   * Stores all configuration items except for child arguments,
+   * referenced by the constants defined in ConfigConstants.scala.
+   */
+  val conf = new mutable.HashMap[String, String]()
+
+  def master  = conf(SPARK_MASTER)
+  def master_= (value: String):Unit = conf.put(SPARK_MASTER, value)
+
+  def executorMemory = conf(SPARK_EXECUTOR_MEMORY)
+  def executorMemory_= (value: String):Unit = 
conf.put(SPARK_EXECUTOR_MEMORY, value)
+
+  def executorCores = conf(SPARK_EXECUTOR_CORES)
+  def executorCores_= (value: String):Unit = 
conf.put(SPARK_EXECUTOR_CORES, value)
+
+  def totalExecutorCores = conf.get(SPARK_CORES_MAX)
+  def totalExecutorCores_= (value: String):Unit = 
conf.put(SPARK_CORES_MAX, value)
+
+  def driverMemory = conf(SPARK_DRIVER_MEMORY)
+  def driverMemory_= (value: String):Unit = conf.put(SPARK_DRIVER_MEMORY, 
value)
+
+  def driverExtraClassPath = conf.get(SPARK_DRIVER_EXTRA_CLASSPATH)
+  def driverExtraClassPath_= (value: String):Unit = 
conf.put(SPARK_DRIVER_EXTRA_CLASSPATH, value)
+
+  def driverExtraLibraryPath = conf.get(SPARK_DRIVER_EXTRA_LIBRARY_PATH)
+  def driverExtraLibraryPath_= (value: String):Unit = 
conf.put(SPARK_DRIVER_EXTRA_LIBRARY_PATH, value)
+
+  def driverExtraJavaOptions = conf.get(SPARK_DRIVER_EXTRA_JAVA_OPTIONS)
+  def driverExtraJavaOptions_= (value: String):Unit = 
conf.put(SPARK_DRIVER_EXTRA_JAVA_OPTIONS, value

[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...

2014-11-04 Thread tigerquoll
Github user tigerquoll commented on a diff in the pull request:

https://github.com/apache/spark/pull/2516#discussion_r19795115
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala ---
@@ -17,201 +17,286 @@
 
 package org.apache.spark.deploy
 
-import java.io.{File, FileInputStream, IOException}
-import java.util.Properties
+import java.io._
 import java.util.jar.JarFile
 
-import scala.collection.JavaConversions._
-import scala.collection.mutable.{ArrayBuffer, HashMap}
+import scala.collection._
 
 import org.apache.spark.SparkException
+import org.apache.spark.deploy.ConfigConstants._
 import org.apache.spark.util.Utils
+import org.apache.spark.deploy.SparkSubmitArguments._
 
 /**
- * Parses and encapsulates arguments from the spark-submit script.
- * The env argument is used for testing.
- */
-private[spark] class SparkSubmitArguments(args: Seq[String], env: 
Map[String, String] = sys.env) {
-  var master: String = null
-  var deployMode: String = null
-  var executorMemory: String = null
-  var executorCores: String = null
-  var totalExecutorCores: String = null
-  var propertiesFile: String = null
-  var driverMemory: String = null
-  var driverExtraClassPath: String = null
-  var driverExtraLibraryPath: String = null
-  var driverExtraJavaOptions: String = null
-  var driverCores: String = null
-  var supervise: Boolean = false
-  var queue: String = null
-  var numExecutors: String = null
-  var files: String = null
-  var archives: String = null
-  var mainClass: String = null
-  var primaryResource: String = null
-  var name: String = null
-  var childArgs: ArrayBuffer[String] = new ArrayBuffer[String]()
-  var jars: String = null
-  var verbose: Boolean = false
-  var isPython: Boolean = false
-  var pyFiles: String = null
-  val sparkProperties: HashMap[String, String] = new HashMap[String, 
String]()
-
-  /** Default properties present in the currently defined defaults file. */
-  lazy val defaultSparkProperties: HashMap[String, String] = {
-val defaultProperties = new HashMap[String, String]()
-if (verbose) SparkSubmit.printStream.println(sUsing properties file: 
$propertiesFile)
-Option(propertiesFile).foreach { filename =
-  val file = new File(filename)
-  SparkSubmitArguments.getPropertiesFromFile(file).foreach { case (k, 
v) =
-if (k.startsWith(spark)) {
-  defaultProperties(k) = v
-  if (verbose) SparkSubmit.printStream.println(sAdding default 
property: $k=$v)
-} else {
-  SparkSubmit.printWarning(sIgnoring non-spark config property: 
$k=$v)
-}
-  }
-}
-defaultProperties
+ * Pulls and validates configuration information together in order of 
priority
+ *
+ * Entries in the conf Map will be filled in the following priority order
+ * 1. entries specified on the command line (except from --conf entries)
+ * 2. Entries specified on the command line with --conf
+ * 3. Legacy environment variables
+ * 4  SPARK_DEFAULT_CONF/spark-defaults.conf or 
SPARK_HOME/conf/spark-defaults.conf if either exist
+ * 5. hard coded defaults
+ *
+*/
+private[spark] class SparkSubmitArguments(args: Seq[String]) {
+  /**
+   * Stores all configuration items except for child arguments,
+   * referenced by the constants defined in ConfigConstants.scala.
+   */
+  val conf = new mutable.HashMap[String, String]()
+
+  def master  = conf(SPARK_MASTER)
+  def master_= (value: String):Unit = conf.put(SPARK_MASTER, value)
+
+  def executorMemory = conf(SPARK_EXECUTOR_MEMORY)
+  def executorMemory_= (value: String):Unit = 
conf.put(SPARK_EXECUTOR_MEMORY, value)
+
+  def executorCores = conf(SPARK_EXECUTOR_CORES)
+  def executorCores_= (value: String):Unit = 
conf.put(SPARK_EXECUTOR_CORES, value)
+
+  def totalExecutorCores = conf.get(SPARK_CORES_MAX)
+  def totalExecutorCores_= (value: String):Unit = 
conf.put(SPARK_CORES_MAX, value)
+
+  def driverMemory = conf(SPARK_DRIVER_MEMORY)
+  def driverMemory_= (value: String):Unit = conf.put(SPARK_DRIVER_MEMORY, 
value)
+
+  def driverExtraClassPath = conf.get(SPARK_DRIVER_EXTRA_CLASSPATH)
+  def driverExtraClassPath_= (value: String):Unit = 
conf.put(SPARK_DRIVER_EXTRA_CLASSPATH, value)
+
+  def driverExtraLibraryPath = conf.get(SPARK_DRIVER_EXTRA_LIBRARY_PATH)
+  def driverExtraLibraryPath_= (value: String):Unit = 
conf.put(SPARK_DRIVER_EXTRA_LIBRARY_PATH, value)
+
+  def driverExtraJavaOptions = conf.get(SPARK_DRIVER_EXTRA_JAVA_OPTIONS)
+  def driverExtraJavaOptions_= (value: String):Unit = 
conf.put(SPARK_DRIVER_EXTRA_JAVA_OPTIONS, value

[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...

2014-11-04 Thread tigerquoll
Github user tigerquoll commented on a diff in the pull request:

https://github.com/apache/spark/pull/2516#discussion_r19795245
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala ---
@@ -227,91 +312,92 @@ private[spark] class SparkSubmitArguments(args: 
Seq[String], env: Map[String, St
  */
 def parse(opts: Seq[String]): Unit = opts match {
   case (--name) :: value :: tail =
-name = value
+cmdLineConfig.put(SPARK_APP_NAME, value)
 parse(tail)
 
   case (--master) :: value :: tail =
-master = value
+cmdLineConfig.put(SPARK_MASTER, value)
 parse(tail)
 
   case (--class) :: value :: tail =
-mainClass = value
+cmdLineConfig.put(SPARK_APP_CLASS, value)
 parse(tail)
 
   case (--deploy-mode) :: value :: tail =
-if (value != client  value != cluster) {
-  SparkSubmit.printErrorAndExit(--deploy-mode must be either 
\client\ or \cluster\)
-}
-deployMode = value
+cmdLineConfig.put(SPARK_DEPLOY_MODE, value)
 parse(tail)
 
   case (--num-executors) :: value :: tail =
-numExecutors = value
+cmdLineConfig.put(SPARK_EXECUTOR_INSTANCES, value)
 parse(tail)
 
   case (--total-executor-cores) :: value :: tail =
-totalExecutorCores = value
+cmdLineConfig.put(SPARK_CORES_MAX, value)
 parse(tail)
 
   case (--executor-cores) :: value :: tail =
-executorCores = value
+cmdLineConfig.put(SPARK_EXECUTOR_CORES, value)
 parse(tail)
 
   case (--executor-memory) :: value :: tail =
-executorMemory = value
+cmdLineConfig.put(SPARK_EXECUTOR_MEMORY, value)
 parse(tail)
 
   case (--driver-memory) :: value :: tail =
-driverMemory = value
+cmdLineConfig.put(SPARK_DRIVER_MEMORY, value)
 parse(tail)
 
   case (--driver-cores) :: value :: tail =
-driverCores = value
+cmdLineConfig.put(SPARK_DRIVER_CORES, value)
 parse(tail)
 
   case (--driver-class-path) :: value :: tail =
-driverExtraClassPath = value
+cmdLineConfig.put(SPARK_DRIVER_EXTRA_CLASSPATH, value)
 parse(tail)
 
   case (--driver-java-options) :: value :: tail =
-driverExtraJavaOptions = value
+cmdLineConfig.put(SPARK_DRIVER_EXTRA_JAVA_OPTIONS, value)
 parse(tail)
 
   case (--driver-library-path) :: value :: tail =
-driverExtraLibraryPath = value
+cmdLineConfig.put(SPARK_DRIVER_EXTRA_LIBRARY_PATH, value)
 parse(tail)
 
   case (--properties-file) :: value :: tail =
-propertiesFile = value
+/*  We merge the property file config options into the rest of the 
command lines options
+ *  after we have finished the rest of the command line processing 
as property files
+ *  cannot override explicit command line options .
+ */
+cmdLinePropertyFileValues ++= 
Utils.getPropertyValuesFromFile(value)
--- End diff --

I've changed back to old behaviour with the most recent merge form current, 
but I will print a warning to the user about what is going on if we detect 
multiple property files options


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...

2014-10-13 Thread tigerquoll
Github user tigerquoll commented on the pull request:

https://github.com/apache/spark/pull/2516#issuecomment-58943431
  
Interesting, can you give any references as to what single slash file uris 
mean? neither RFC 1738 or 1630 seem to mention them, and 
http://en.wikipedia.org/wiki/File_URI_scheme only mentions them in passing in 
that some web browsers allow them even though it is against the spec.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...

2014-10-12 Thread tigerquoll
Github user tigerquoll commented on the pull request:

https://github.com/apache/spark/pull/2516#issuecomment-58775682
  
Hi @vanzin, I've implemented your suggestions, tidied up the code more, and 
also added more unit tests to flesh out the test coverage.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...

2014-10-12 Thread tigerquoll
Github user tigerquoll commented on the pull request:

https://github.com/apache/spark/pull/2516#issuecomment-58775741
  
@andrewor14 we could be stepping on each other's toes soon.  Have a query 
about your work on Utils.ResolveUris.  I notice that you produce and test for 
file URIs with a single fwd slash (file:/foo/bar).
Shouldn't they have two fwd slashes (file://foo/bar) ?  I've left my unit 
tests expecting a single fwd slash for now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...

2014-10-11 Thread tigerquoll
Github user tigerquoll commented on a diff in the pull request:

https://github.com/apache/spark/pull/2516#discussion_r18745897
  
--- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala ---
@@ -1479,6 +1479,14 @@ private[spark] object Utils extends Logging {
 PropertyConfigurator.configure(pro)
   }
 
+  /**
+   * Flatten a map of maps out into a single map, later maps in the 
propList
--- End diff --

fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...

2014-09-30 Thread tigerquoll
Github user tigerquoll commented on a diff in the pull request:

https://github.com/apache/spark/pull/2516#discussion_r18209006
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala ---
@@ -17,155 +17,195 @@
 
 package org.apache.spark.deploy
 
-import java.io.{File, FileInputStream, IOException}
-import java.util.Properties
+import java.io.{InputStreamReader, File, FileInputStream, InputStream}
 import java.util.jar.JarFile
+import java.util.Properties
 
+import scala.collection._
+import scala.collection.JavaConverters._
 import scala.collection.JavaConversions._
-import scala.collection.mutable.{ArrayBuffer, HashMap}
+import org.apache.commons.lang3.CharEncoding
 
-import org.apache.spark.SparkException
+import org.apache.spark.deploy.ConfigConstants._
 import org.apache.spark.util.Utils
 
+
+
 /**
- * Parses and encapsulates arguments from the spark-submit script.
- */
+ * Pulls configuration information together in order of priority
+ *
+ * Entries in the conf Map will be filled in the following priority order
+ * 1. entries specified on the command line (except from --conf entries)
+ * 2. Entries specified on the command line with --conf
+ * 3. Environment variables (including legacy variable mappings)
+ * 4. System config variables (eg by using -Dspark.var.name)
+ * 5  SPARK_DEFAULT_CONF/spark-defaults.conf or 
SPARK_HOME/conf/spark-defaults.conf if either exist
+ * 6. hard coded defaults in class path at spark-submit-defaults.prop
+ *
+ * A property file specified by one of the means listed above gets read in 
and the properties are
+ * considered to be at the priority of the method that specified the 
files. A property specified in
+ * a property file will not override an existing config value at that same 
level
+*/
 private[spark] class SparkSubmitArguments(args: Seq[String]) {
-  var master: String = null
-  var deployMode: String = null
-  var executorMemory: String = null
-  var executorCores: String = null
-  var totalExecutorCores: String = null
-  var propertiesFile: String = null
-  var driverMemory: String = null
-  var driverExtraClassPath: String = null
-  var driverExtraLibraryPath: String = null
-  var driverExtraJavaOptions: String = null
-  var driverCores: String = null
-  var supervise: Boolean = false
-  var queue: String = null
-  var numExecutors: String = null
-  var files: String = null
-  var archives: String = null
-  var mainClass: String = null
-  var primaryResource: String = null
-  var name: String = null
-  var childArgs: ArrayBuffer[String] = new ArrayBuffer[String]()
-  var jars: String = null
-  var verbose: Boolean = false
-  var isPython: Boolean = false
-  var pyFiles: String = null
-  val sparkProperties: HashMap[String, String] = new HashMap[String, 
String]()
-
-  /** Default properties present in the currently defined defaults file. */
-  lazy val defaultSparkProperties: HashMap[String, String] = {
-val defaultProperties = new HashMap[String, String]()
-if (verbose) SparkSubmit.printStream.println(sUsing properties file: 
$propertiesFile)
-Option(propertiesFile).foreach { filename =
-  val file = new File(filename)
-  SparkSubmitArguments.getPropertiesFromFile(file).foreach { case (k, 
v) =
-if (k.startsWith(spark)) {
-  defaultProperties(k) = v
-  if (verbose) SparkSubmit.printStream.println(sAdding default 
property: $k=$v)
-} else {
-  SparkSubmit.printWarning(sIgnoring non-spark config property: 
$k=$v)
-}
-  }
-}
-defaultProperties
-  }
+  /**
+   * Stores all configuration items except for child arguments,
+   * referenced by the constants defined in ConfigConstants.scala
+   */
+  val conf = new mutable.HashMap[String, String]()
+
+  def master  = conf(SparkMaster)
+  def master_= (value: String):Unit = conf.put(SparkMaster, value)
+
+  def deployMode = conf(SparkDeployMode)
+  def deployMode_= (value: String):Unit = conf.put(SparkDeployMode, value)
+
+  def executorMemory = conf(SparkExecutorMemory)
+  def executorMemory_= (value: String):Unit = 
conf.put(SparkExecutorMemory, value)
+
+  def executorCores = conf(SparkExecutorCores)
+  def executorCores_= (value: String):Unit = conf.put(SparkExecutorCores, 
value)
+
+  def totalExecutorCores = conf.get(SparkCoresMax)
+  def totalExecutorCores_= (value: String):Unit = conf.put(SparkCoresMax, 
value)
+
+  def driverMemory = conf(SparkDriverMemory)
+  def driverMemory_= (value: String):Unit = conf.put(SparkDriverMemory, 
value)
+
+  def driverExtraClassPath

[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...

2014-09-29 Thread tigerquoll
Github user tigerquoll commented on a diff in the pull request:

https://github.com/apache/spark/pull/2516#discussion_r18148421
  
--- Diff: 
core/src/main/resources/org/apache/spark/deploy/spark-submit-defaults.prop ---
@@ -0,0 +1,18 @@
+
+spark.master = local[*]
--- End diff --

Ok, will do


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...

2014-09-29 Thread tigerquoll
Github user tigerquoll commented on a diff in the pull request:

https://github.com/apache/spark/pull/2516#discussion_r18148846
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala ---
@@ -406,22 +412,173 @@ private[spark] class SparkSubmitArguments(args: 
Seq[String]) {
   }
 }
 
-object SparkSubmitArguments {
-  /** Load properties present in the given file. */
-  def getPropertiesFromFile(file: File): Seq[(String, String)] = {
-require(file.exists(), sProperties file $file does not exist)
-require(file.isFile(), sProperties file $file is not a normal file)
-val inputStream = new FileInputStream(file)
+private[spark] object SparkSubmitArguments {
+  /**
+   * Resolves Configuration sources in order of highest to lowest
+   * 1. Each map passed in as additionalConfig from first to last
+   * 2. Environment variables (including legacy variable mappings)
+   * 3. System config variables (eg by using -Dspark.var.name)
+   * 4  SPARK_DEFAULT_CONF/spark-defaults.conf or 
SPARK_HOME/conf/spark-defaults.conf
+   * 5. hard coded defaults in class path at spark-submit-defaults.prop
+   *
+   * A property file specified by one of the means listed above gets read 
in and the properties are
+   * considered to be at the priority of the method that specified the 
files.
+   * A property specified in a property file will not override an existing
+   * config value at that same level
+   *
+   * @param additionalConfigs Seq of additional 
Map[ConfigName-ConfigValue] in order of highest
+   *  priority to lowest this will have priority 
over internal sources
+   * @return Map[propName-propFile] containing values merged from all 
sources in order of priority
+   */
+  def mergeSparkProperties(additionalConfigs: Seq [Map[String,String]]) = {
+// Configuration read in from spark-submit-defaults.prop file found on 
the classpath
+var hardCodedDefaultConfig: Option[Map[String,String]] = None
--- End diff --

Ok, nuked the property file read in from the classpath and now using a 
default value map


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...

2014-09-29 Thread tigerquoll
Github user tigerquoll commented on a diff in the pull request:

https://github.com/apache/spark/pull/2516#discussion_r18148885
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/MergablePropertiesTest.scala ---
@@ -0,0 +1,55 @@
+package org.apache.spark.deploy
--- End diff --

added


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...

2014-09-29 Thread tigerquoll
Github user tigerquoll commented on a diff in the pull request:

https://github.com/apache/spark/pull/2516#discussion_r18190759
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala ---
@@ -406,22 +412,173 @@ private[spark] class SparkSubmitArguments(args: 
Seq[String]) {
   }
 }
 
-object SparkSubmitArguments {
-  /** Load properties present in the given file. */
-  def getPropertiesFromFile(file: File): Seq[(String, String)] = {
-require(file.exists(), sProperties file $file does not exist)
-require(file.isFile(), sProperties file $file is not a normal file)
-val inputStream = new FileInputStream(file)
+private[spark] object SparkSubmitArguments {
+  /**
+   * Resolves Configuration sources in order of highest to lowest
+   * 1. Each map passed in as additionalConfig from first to last
+   * 2. Environment variables (including legacy variable mappings)
+   * 3. System config variables (eg by using -Dspark.var.name)
+   * 4  SPARK_DEFAULT_CONF/spark-defaults.conf or 
SPARK_HOME/conf/spark-defaults.conf
+   * 5. hard coded defaults in class path at spark-submit-defaults.prop
+   *
+   * A property file specified by one of the means listed above gets read 
in and the properties are
+   * considered to be at the priority of the method that specified the 
files.
+   * A property specified in a property file will not override an existing
+   * config value at that same level
+   *
+   * @param additionalConfigs Seq of additional 
Map[ConfigName-ConfigValue] in order of highest
+   *  priority to lowest this will have priority 
over internal sources
+   * @return Map[propName-propFile] containing values merged from all 
sources in order of priority
+   */
+  def mergeSparkProperties(additionalConfigs: Seq [Map[String,String]]) = {
+// Configuration read in from spark-submit-defaults.prop file found on 
the classpath
+var hardCodedDefaultConfig: Option[Map[String,String]] = None
+var is: InputStream = null
+var isr: Option[InputStreamReader] = None
 try {
-  val properties = new Properties()
-  properties.load(inputStream)
-  properties.stringPropertyNames().toSeq.map(k = (k, 
properties(k).trim))
-} catch {
-  case e: IOException =
-val message = sFailed when loading Spark properties file $file
-throw new SparkException(message, e)
+  is = 
Thread.currentThread().getContextClassLoader.getResourceAsStream(ClassPathSparkSubmitDefaults)
+
+  // only open InputStreamReader if InputStream was successfully opened
+  isr = Option(is).map{is: InputStream =
+new InputStreamReader(is, CharEncoding.UTF_8)
+  }
+
+  hardCodedDefaultConfig = isr.map( defaultValueStream =
+
SparkSubmitArguments.getPropertyValuesFromStream(defaultValueStream))
 } finally {
-  inputStream.close()
+  Option(is).foreach(_.close)
+  isr.foreach(_.close)
 }
+
+if (hardCodedDefaultConfig.isEmpty || (hardCodedDefaultConfig.get.size 
== 0)) {
+  throw new IllegalStateException(sDefault values not found at 
classpath $ClassPathSparkSubmitDefaults)
+}
+
+// Configuration read in from defaults file if it exists
+var sparkDefaultConfig = SparkSubmitArguments.getSparkDefaultFileConfig
+
+if (sparkDefaultConfig.isDefinedAt(SparkPropertiesFile))  {
+  SparkSubmitArguments.getPropertyValuesFromFile(
+  sparkDefaultConfig.get(SparkPropertiesFile).get)
+} else {
+  Map.empty
+}
+
+// Configuration from java system properties
+val systemPropertyConfig = 
SparkSubmitArguments.getPropertyMap(System.getProperties)
+
+// Configuration variables from the environment
+// support legacy variables
+val environmentConfig = System.getenv().asScala
+
+val legacyEnvVars = Seq(MASTER-SparkMaster, 
DEPLOY_MODE-SparkDeployMode,
+  SPARK_DRIVER_MEMORY-SparkDriverMemory, 
SPARK_EXECUTOR_MEMORY-SparkExecutorMemory)
+
+// legacy variables act at the priority of a system property
+val propsWithEnvVars : mutable.Map[String,String] = new 
mutable.HashMap() ++ systemPropertyConfig ++ legacyEnvVars
+  .map( {case(varName, propName) = (environmentConfig.get(varName), 
propName) })
+  .filter( {case(varVariable, _) = varVariable.isDefined  
!varVariable.get.isEmpty} )
+  .map{case(varVariable, propName) = (propName, varVariable.get)}
+
+val ConfigSources  = additionalConfigs ++ Seq (
+  environmentConfig,
+  propsWithEnvVars,
+  sparkDefaultConfig,
+  hardCodedDefaultConfig.get
+)
+
+// Load properties file

[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...

2014-09-29 Thread tigerquoll
Github user tigerquoll commented on a diff in the pull request:

https://github.com/apache/spark/pull/2516#discussion_r18191705
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala ---
@@ -406,22 +412,173 @@ private[spark] class SparkSubmitArguments(args: 
Seq[String]) {
   }
 }
 
-object SparkSubmitArguments {
-  /** Load properties present in the given file. */
-  def getPropertiesFromFile(file: File): Seq[(String, String)] = {
-require(file.exists(), sProperties file $file does not exist)
-require(file.isFile(), sProperties file $file is not a normal file)
-val inputStream = new FileInputStream(file)
+private[spark] object SparkSubmitArguments {
+  /**
+   * Resolves Configuration sources in order of highest to lowest
+   * 1. Each map passed in as additionalConfig from first to last
+   * 2. Environment variables (including legacy variable mappings)
+   * 3. System config variables (eg by using -Dspark.var.name)
+   * 4  SPARK_DEFAULT_CONF/spark-defaults.conf or 
SPARK_HOME/conf/spark-defaults.conf
+   * 5. hard coded defaults in class path at spark-submit-defaults.prop
+   *
+   * A property file specified by one of the means listed above gets read 
in and the properties are
+   * considered to be at the priority of the method that specified the 
files.
+   * A property specified in a property file will not override an existing
+   * config value at that same level
+   *
+   * @param additionalConfigs Seq of additional 
Map[ConfigName-ConfigValue] in order of highest
+   *  priority to lowest this will have priority 
over internal sources
+   * @return Map[propName-propFile] containing values merged from all 
sources in order of priority
+   */
+  def mergeSparkProperties(additionalConfigs: Seq [Map[String,String]]) = {
+// Configuration read in from spark-submit-defaults.prop file found on 
the classpath
+var hardCodedDefaultConfig: Option[Map[String,String]] = None
+var is: InputStream = null
+var isr: Option[InputStreamReader] = None
 try {
-  val properties = new Properties()
-  properties.load(inputStream)
-  properties.stringPropertyNames().toSeq.map(k = (k, 
properties(k).trim))
-} catch {
-  case e: IOException =
-val message = sFailed when loading Spark properties file $file
-throw new SparkException(message, e)
+  is = 
Thread.currentThread().getContextClassLoader.getResourceAsStream(ClassPathSparkSubmitDefaults)
+
+  // only open InputStreamReader if InputStream was successfully opened
+  isr = Option(is).map{is: InputStream =
+new InputStreamReader(is, CharEncoding.UTF_8)
+  }
+
+  hardCodedDefaultConfig = isr.map( defaultValueStream =
+
SparkSubmitArguments.getPropertyValuesFromStream(defaultValueStream))
 } finally {
-  inputStream.close()
+  Option(is).foreach(_.close)
+  isr.foreach(_.close)
 }
+
+if (hardCodedDefaultConfig.isEmpty || (hardCodedDefaultConfig.get.size 
== 0)) {
+  throw new IllegalStateException(sDefault values not found at 
classpath $ClassPathSparkSubmitDefaults)
+}
+
+// Configuration read in from defaults file if it exists
+var sparkDefaultConfig = SparkSubmitArguments.getSparkDefaultFileConfig
+
+if (sparkDefaultConfig.isDefinedAt(SparkPropertiesFile))  {
+  SparkSubmitArguments.getPropertyValuesFromFile(
+  sparkDefaultConfig.get(SparkPropertiesFile).get)
+} else {
+  Map.empty
+}
+
+// Configuration from java system properties
+val systemPropertyConfig = 
SparkSubmitArguments.getPropertyMap(System.getProperties)
+
+// Configuration variables from the environment
+// support legacy variables
+val environmentConfig = System.getenv().asScala
+
+val legacyEnvVars = Seq(MASTER-SparkMaster, 
DEPLOY_MODE-SparkDeployMode,
+  SPARK_DRIVER_MEMORY-SparkDriverMemory, 
SPARK_EXECUTOR_MEMORY-SparkExecutorMemory)
+
+// legacy variables act at the priority of a system property
+val propsWithEnvVars : mutable.Map[String,String] = new 
mutable.HashMap() ++ systemPropertyConfig ++ legacyEnvVars
+  .map( {case(varName, propName) = (environmentConfig.get(varName), 
propName) })
+  .filter( {case(varVariable, _) = varVariable.isDefined  
!varVariable.get.isEmpty} )
+  .map{case(varVariable, propName) = (propName, varVariable.get)}
+
+val ConfigSources  = additionalConfigs ++ Seq (
+  environmentConfig,
+  propsWithEnvVars,
+  sparkDefaultConfig,
+  hardCodedDefaultConfig.get
+)
+
+// Load properties file

[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...

2014-09-28 Thread tigerquoll
Github user tigerquoll commented on a diff in the pull request:

https://github.com/apache/spark/pull/2516#discussion_r18128789
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala ---
@@ -406,22 +412,173 @@ private[spark] class SparkSubmitArguments(args: 
Seq[String]) {
   }
 }
 
-object SparkSubmitArguments {
-  /** Load properties present in the given file. */
-  def getPropertiesFromFile(file: File): Seq[(String, String)] = {
-require(file.exists(), sProperties file $file does not exist)
-require(file.isFile(), sProperties file $file is not a normal file)
-val inputStream = new FileInputStream(file)
+private[spark] object SparkSubmitArguments {
+  /**
+   * Resolves Configuration sources in order of highest to lowest
+   * 1. Each map passed in as additionalConfig from first to last
+   * 2. Environment variables (including legacy variable mappings)
+   * 3. System config variables (eg by using -Dspark.var.name)
+   * 4  SPARK_DEFAULT_CONF/spark-defaults.conf or 
SPARK_HOME/conf/spark-defaults.conf
+   * 5. hard coded defaults in class path at spark-submit-defaults.prop
+   *
+   * A property file specified by one of the means listed above gets read 
in and the properties are
+   * considered to be at the priority of the method that specified the 
files.
+   * A property specified in a property file will not override an existing
+   * config value at that same level
+   *
+   * @param additionalConfigs Seq of additional 
Map[ConfigName-ConfigValue] in order of highest
+   *  priority to lowest this will have priority 
over internal sources
+   * @return Map[propName-propFile] containing values merged from all 
sources in order of priority
+   */
+  def mergeSparkProperties(additionalConfigs: Seq [Map[String,String]]) = {
+// Configuration read in from spark-submit-defaults.prop file found on 
the classpath
+var hardCodedDefaultConfig: Option[Map[String,String]] = None
+var is: InputStream = null
+var isr: Option[InputStreamReader] = None
 try {
-  val properties = new Properties()
-  properties.load(inputStream)
-  properties.stringPropertyNames().toSeq.map(k = (k, 
properties(k).trim))
-} catch {
-  case e: IOException =
-val message = sFailed when loading Spark properties file $file
-throw new SparkException(message, e)
+  is = 
Thread.currentThread().getContextClassLoader.getResourceAsStream(ClassPathSparkSubmitDefaults)
+
+  // only open InputStreamReader if InputStream was successfully opened
+  isr = Option(is).map{is: InputStream =
+new InputStreamReader(is, CharEncoding.UTF_8)
+  }
+
+  hardCodedDefaultConfig = isr.map( defaultValueStream =
+
SparkSubmitArguments.getPropertyValuesFromStream(defaultValueStream))
 } finally {
-  inputStream.close()
+  Option(is).foreach(_.close)
+  isr.foreach(_.close)
 }
+
+if (hardCodedDefaultConfig.isEmpty || (hardCodedDefaultConfig.get.size 
== 0)) {
+  throw new IllegalStateException(sDefault values not found at 
classpath $ClassPathSparkSubmitDefaults)
+}
+
+// Configuration read in from defaults file if it exists
+var sparkDefaultConfig = SparkSubmitArguments.getSparkDefaultFileConfig
+
+if (sparkDefaultConfig.isDefinedAt(SparkPropertiesFile))  {
+  SparkSubmitArguments.getPropertyValuesFromFile(
+  sparkDefaultConfig.get(SparkPropertiesFile).get)
+} else {
+  Map.empty
+}
+
+// Configuration from java system properties
+val systemPropertyConfig = 
SparkSubmitArguments.getPropertyMap(System.getProperties)
+
+// Configuration variables from the environment
+// support legacy variables
+val environmentConfig = System.getenv().asScala
+
+val legacyEnvVars = Seq(MASTER-SparkMaster, 
DEPLOY_MODE-SparkDeployMode,
+  SPARK_DRIVER_MEMORY-SparkDriverMemory, 
SPARK_EXECUTOR_MEMORY-SparkExecutorMemory)
+
+// legacy variables act at the priority of a system property
+val propsWithEnvVars : mutable.Map[String,String] = new 
mutable.HashMap() ++ systemPropertyConfig ++ legacyEnvVars
+  .map( {case(varName, propName) = (environmentConfig.get(varName), 
propName) })
+  .filter( {case(varVariable, _) = varVariable.isDefined  
!varVariable.get.isEmpty} )
+  .map{case(varVariable, propName) = (propName, varVariable.get)}
+
+val ConfigSources  = additionalConfigs ++ Seq (
+  environmentConfig,
+  propsWithEnvVars,
+  sparkDefaultConfig,
+  hardCodedDefaultConfig.get
+)
+
+// Load properties file

[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...

2014-09-28 Thread tigerquoll
Github user tigerquoll commented on a diff in the pull request:

https://github.com/apache/spark/pull/2516#discussion_r18128856
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala ---
@@ -406,22 +412,173 @@ private[spark] class SparkSubmitArguments(args: 
Seq[String]) {
   }
 }
 
-object SparkSubmitArguments {
-  /** Load properties present in the given file. */
-  def getPropertiesFromFile(file: File): Seq[(String, String)] = {
-require(file.exists(), sProperties file $file does not exist)
-require(file.isFile(), sProperties file $file is not a normal file)
-val inputStream = new FileInputStream(file)
+private[spark] object SparkSubmitArguments {
+  /**
+   * Resolves Configuration sources in order of highest to lowest
+   * 1. Each map passed in as additionalConfig from first to last
+   * 2. Environment variables (including legacy variable mappings)
+   * 3. System config variables (eg by using -Dspark.var.name)
+   * 4  SPARK_DEFAULT_CONF/spark-defaults.conf or 
SPARK_HOME/conf/spark-defaults.conf
+   * 5. hard coded defaults in class path at spark-submit-defaults.prop
+   *
+   * A property file specified by one of the means listed above gets read 
in and the properties are
+   * considered to be at the priority of the method that specified the 
files.
+   * A property specified in a property file will not override an existing
+   * config value at that same level
+   *
+   * @param additionalConfigs Seq of additional 
Map[ConfigName-ConfigValue] in order of highest
+   *  priority to lowest this will have priority 
over internal sources
+   * @return Map[propName-propFile] containing values merged from all 
sources in order of priority
+   */
+  def mergeSparkProperties(additionalConfigs: Seq [Map[String,String]]) = {
+// Configuration read in from spark-submit-defaults.prop file found on 
the classpath
+var hardCodedDefaultConfig: Option[Map[String,String]] = None
+var is: InputStream = null
+var isr: Option[InputStreamReader] = None
 try {
-  val properties = new Properties()
-  properties.load(inputStream)
-  properties.stringPropertyNames().toSeq.map(k = (k, 
properties(k).trim))
-} catch {
-  case e: IOException =
-val message = sFailed when loading Spark properties file $file
-throw new SparkException(message, e)
+  is = 
Thread.currentThread().getContextClassLoader.getResourceAsStream(ClassPathSparkSubmitDefaults)
+
+  // only open InputStreamReader if InputStream was successfully opened
+  isr = Option(is).map{is: InputStream =
+new InputStreamReader(is, CharEncoding.UTF_8)
+  }
+
+  hardCodedDefaultConfig = isr.map( defaultValueStream =
+
SparkSubmitArguments.getPropertyValuesFromStream(defaultValueStream))
 } finally {
-  inputStream.close()
+  Option(is).foreach(_.close)
+  isr.foreach(_.close)
 }
+
+if (hardCodedDefaultConfig.isEmpty || (hardCodedDefaultConfig.get.size 
== 0)) {
+  throw new IllegalStateException(sDefault values not found at 
classpath $ClassPathSparkSubmitDefaults)
+}
+
+// Configuration read in from defaults file if it exists
+var sparkDefaultConfig = SparkSubmitArguments.getSparkDefaultFileConfig
+
+if (sparkDefaultConfig.isDefinedAt(SparkPropertiesFile))  {
+  SparkSubmitArguments.getPropertyValuesFromFile(
+  sparkDefaultConfig.get(SparkPropertiesFile).get)
+} else {
+  Map.empty
+}
+
+// Configuration from java system properties
+val systemPropertyConfig = 
SparkSubmitArguments.getPropertyMap(System.getProperties)
+
+// Configuration variables from the environment
+// support legacy variables
+val environmentConfig = System.getenv().asScala
+
+val legacyEnvVars = Seq(MASTER-SparkMaster, 
DEPLOY_MODE-SparkDeployMode,
+  SPARK_DRIVER_MEMORY-SparkDriverMemory, 
SPARK_EXECUTOR_MEMORY-SparkExecutorMemory)
+
+// legacy variables act at the priority of a system property
+val propsWithEnvVars : mutable.Map[String,String] = new 
mutable.HashMap() ++ systemPropertyConfig ++ legacyEnvVars
+  .map( {case(varName, propName) = (environmentConfig.get(varName), 
propName) })
+  .filter( {case(varVariable, _) = varVariable.isDefined  
!varVariable.get.isEmpty} )
+  .map{case(varVariable, propName) = (propName, varVariable.get)}
+
+val ConfigSources  = additionalConfigs ++ Seq (
+  environmentConfig,
+  propsWithEnvVars,
+  sparkDefaultConfig,
+  hardCodedDefaultConfig.get
+)
+
+// Load properties file

[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...

2014-09-28 Thread tigerquoll
Github user tigerquoll commented on a diff in the pull request:

https://github.com/apache/spark/pull/2516#discussion_r18128941
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala ---
@@ -406,22 +412,173 @@ private[spark] class SparkSubmitArguments(args: 
Seq[String]) {
   }
 }
 
-object SparkSubmitArguments {
-  /** Load properties present in the given file. */
-  def getPropertiesFromFile(file: File): Seq[(String, String)] = {
-require(file.exists(), sProperties file $file does not exist)
-require(file.isFile(), sProperties file $file is not a normal file)
-val inputStream = new FileInputStream(file)
+private[spark] object SparkSubmitArguments {
+  /**
+   * Resolves Configuration sources in order of highest to lowest
+   * 1. Each map passed in as additionalConfig from first to last
+   * 2. Environment variables (including legacy variable mappings)
+   * 3. System config variables (eg by using -Dspark.var.name)
+   * 4  SPARK_DEFAULT_CONF/spark-defaults.conf or 
SPARK_HOME/conf/spark-defaults.conf
+   * 5. hard coded defaults in class path at spark-submit-defaults.prop
+   *
+   * A property file specified by one of the means listed above gets read 
in and the properties are
+   * considered to be at the priority of the method that specified the 
files.
+   * A property specified in a property file will not override an existing
+   * config value at that same level
+   *
+   * @param additionalConfigs Seq of additional 
Map[ConfigName-ConfigValue] in order of highest
+   *  priority to lowest this will have priority 
over internal sources
+   * @return Map[propName-propFile] containing values merged from all 
sources in order of priority
+   */
+  def mergeSparkProperties(additionalConfigs: Seq [Map[String,String]]) = {
+// Configuration read in from spark-submit-defaults.prop file found on 
the classpath
+var hardCodedDefaultConfig: Option[Map[String,String]] = None
+var is: InputStream = null
+var isr: Option[InputStreamReader] = None
 try {
-  val properties = new Properties()
-  properties.load(inputStream)
-  properties.stringPropertyNames().toSeq.map(k = (k, 
properties(k).trim))
-} catch {
-  case e: IOException =
-val message = sFailed when loading Spark properties file $file
-throw new SparkException(message, e)
+  is = 
Thread.currentThread().getContextClassLoader.getResourceAsStream(ClassPathSparkSubmitDefaults)
+
+  // only open InputStreamReader if InputStream was successfully opened
+  isr = Option(is).map{is: InputStream =
+new InputStreamReader(is, CharEncoding.UTF_8)
+  }
+
+  hardCodedDefaultConfig = isr.map( defaultValueStream =
+
SparkSubmitArguments.getPropertyValuesFromStream(defaultValueStream))
 } finally {
-  inputStream.close()
+  Option(is).foreach(_.close)
+  isr.foreach(_.close)
 }
+
+if (hardCodedDefaultConfig.isEmpty || (hardCodedDefaultConfig.get.size 
== 0)) {
+  throw new IllegalStateException(sDefault values not found at 
classpath $ClassPathSparkSubmitDefaults)
+}
+
+// Configuration read in from defaults file if it exists
+var sparkDefaultConfig = SparkSubmitArguments.getSparkDefaultFileConfig
+
+if (sparkDefaultConfig.isDefinedAt(SparkPropertiesFile))  {
+  SparkSubmitArguments.getPropertyValuesFromFile(
+  sparkDefaultConfig.get(SparkPropertiesFile).get)
+} else {
+  Map.empty
+}
+
+// Configuration from java system properties
+val systemPropertyConfig = 
SparkSubmitArguments.getPropertyMap(System.getProperties)
+
+// Configuration variables from the environment
+// support legacy variables
+val environmentConfig = System.getenv().asScala
+
+val legacyEnvVars = Seq(MASTER-SparkMaster, 
DEPLOY_MODE-SparkDeployMode,
+  SPARK_DRIVER_MEMORY-SparkDriverMemory, 
SPARK_EXECUTOR_MEMORY-SparkExecutorMemory)
+
+// legacy variables act at the priority of a system property
--- End diff --

Ok my way was even getting me confused.  Lets use your suggested code and 
treat legacy env variables at the same priority as normal environment variables.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr

[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...

2014-09-28 Thread tigerquoll
Github user tigerquoll commented on a diff in the pull request:

https://github.com/apache/spark/pull/2516#discussion_r18133582
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala ---
@@ -17,155 +17,195 @@
 
 package org.apache.spark.deploy
 
-import java.io.{File, FileInputStream, IOException}
-import java.util.Properties
+import java.io.{InputStreamReader, File, FileInputStream, InputStream}
 import java.util.jar.JarFile
+import java.util.Properties
 
+import scala.collection._
+import scala.collection.JavaConverters._
 import scala.collection.JavaConversions._
-import scala.collection.mutable.{ArrayBuffer, HashMap}
+import org.apache.commons.lang3.CharEncoding
 
-import org.apache.spark.SparkException
+import org.apache.spark.deploy.ConfigConstants._
 import org.apache.spark.util.Utils
 
+
+
 /**
- * Parses and encapsulates arguments from the spark-submit script.
- */
+ * Pulls configuration information together in order of priority
+ *
+ * Entries in the conf Map will be filled in the following priority order
+ * 1. entries specified on the command line (except from --conf entries)
+ * 2. Entries specified on the command line with --conf
+ * 3. Environment variables (including legacy variable mappings)
+ * 4. System config variables (eg by using -Dspark.var.name)
+ * 5  SPARK_DEFAULT_CONF/spark-defaults.conf or 
SPARK_HOME/conf/spark-defaults.conf if either exist
+ * 6. hard coded defaults in class path at spark-submit-defaults.prop
+ *
+ * A property file specified by one of the means listed above gets read in 
and the properties are
+ * considered to be at the priority of the method that specified the 
files. A property specified in
+ * a property file will not override an existing config value at that same 
level
+*/
 private[spark] class SparkSubmitArguments(args: Seq[String]) {
-  var master: String = null
-  var deployMode: String = null
-  var executorMemory: String = null
-  var executorCores: String = null
-  var totalExecutorCores: String = null
-  var propertiesFile: String = null
-  var driverMemory: String = null
-  var driverExtraClassPath: String = null
-  var driverExtraLibraryPath: String = null
-  var driverExtraJavaOptions: String = null
-  var driverCores: String = null
-  var supervise: Boolean = false
-  var queue: String = null
-  var numExecutors: String = null
-  var files: String = null
-  var archives: String = null
-  var mainClass: String = null
-  var primaryResource: String = null
-  var name: String = null
-  var childArgs: ArrayBuffer[String] = new ArrayBuffer[String]()
-  var jars: String = null
-  var verbose: Boolean = false
-  var isPython: Boolean = false
-  var pyFiles: String = null
-  val sparkProperties: HashMap[String, String] = new HashMap[String, 
String]()
-
-  /** Default properties present in the currently defined defaults file. */
-  lazy val defaultSparkProperties: HashMap[String, String] = {
-val defaultProperties = new HashMap[String, String]()
-if (verbose) SparkSubmit.printStream.println(sUsing properties file: 
$propertiesFile)
-Option(propertiesFile).foreach { filename =
-  val file = new File(filename)
-  SparkSubmitArguments.getPropertiesFromFile(file).foreach { case (k, 
v) =
-if (k.startsWith(spark)) {
-  defaultProperties(k) = v
-  if (verbose) SparkSubmit.printStream.println(sAdding default 
property: $k=$v)
-} else {
-  SparkSubmit.printWarning(sIgnoring non-spark config property: 
$k=$v)
-}
-  }
-}
-defaultProperties
-  }
+  /**
+   * Stores all configuration items except for child arguments,
+   * referenced by the constants defined in ConfigConstants.scala
+   */
+  val conf = new mutable.HashMap[String, String]()
+
+  def master  = conf(SparkMaster)
+  def master_= (value: String):Unit = conf.put(SparkMaster, value)
+
+  def deployMode = conf(SparkDeployMode)
+  def deployMode_= (value: String):Unit = conf.put(SparkDeployMode, value)
+
+  def executorMemory = conf(SparkExecutorMemory)
+  def executorMemory_= (value: String):Unit = 
conf.put(SparkExecutorMemory, value)
+
+  def executorCores = conf(SparkExecutorCores)
+  def executorCores_= (value: String):Unit = conf.put(SparkExecutorCores, 
value)
+
+  def totalExecutorCores = conf.get(SparkCoresMax)
+  def totalExecutorCores_= (value: String):Unit = conf.put(SparkCoresMax, 
value)
+
+  def driverMemory = conf(SparkDriverMemory)
+  def driverMemory_= (value: String):Unit = conf.put(SparkDriverMemory, 
value)
+
+  def driverExtraClassPath

[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...

2014-09-28 Thread tigerquoll
Github user tigerquoll commented on a diff in the pull request:

https://github.com/apache/spark/pull/2516#discussion_r18133601
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala ---
@@ -406,22 +412,173 @@ private[spark] class SparkSubmitArguments(args: 
Seq[String]) {
   }
 }
 
-object SparkSubmitArguments {
-  /** Load properties present in the given file. */
-  def getPropertiesFromFile(file: File): Seq[(String, String)] = {
-require(file.exists(), sProperties file $file does not exist)
-require(file.isFile(), sProperties file $file is not a normal file)
-val inputStream = new FileInputStream(file)
+private[spark] object SparkSubmitArguments {
+  /**
+   * Resolves Configuration sources in order of highest to lowest
+   * 1. Each map passed in as additionalConfig from first to last
+   * 2. Environment variables (including legacy variable mappings)
+   * 3. System config variables (eg by using -Dspark.var.name)
+   * 4  SPARK_DEFAULT_CONF/spark-defaults.conf or 
SPARK_HOME/conf/spark-defaults.conf
+   * 5. hard coded defaults in class path at spark-submit-defaults.prop
+   *
+   * A property file specified by one of the means listed above gets read 
in and the properties are
+   * considered to be at the priority of the method that specified the 
files.
+   * A property specified in a property file will not override an existing
+   * config value at that same level
+   *
+   * @param additionalConfigs Seq of additional 
Map[ConfigName-ConfigValue] in order of highest
+   *  priority to lowest this will have priority 
over internal sources
+   * @return Map[propName-propFile] containing values merged from all 
sources in order of priority
+   */
+  def mergeSparkProperties(additionalConfigs: Seq [Map[String,String]]) = {
+// Configuration read in from spark-submit-defaults.prop file found on 
the classpath
+var hardCodedDefaultConfig: Option[Map[String,String]] = None
+var is: InputStream = null
+var isr: Option[InputStreamReader] = None
 try {
-  val properties = new Properties()
-  properties.load(inputStream)
-  properties.stringPropertyNames().toSeq.map(k = (k, 
properties(k).trim))
-} catch {
-  case e: IOException =
-val message = sFailed when loading Spark properties file $file
-throw new SparkException(message, e)
+  is = 
Thread.currentThread().getContextClassLoader.getResourceAsStream(ClassPathSparkSubmitDefaults)
+
+  // only open InputStreamReader if InputStream was successfully opened
+  isr = Option(is).map{is: InputStream =
+new InputStreamReader(is, CharEncoding.UTF_8)
+  }
+
+  hardCodedDefaultConfig = isr.map( defaultValueStream =
+
SparkSubmitArguments.getPropertyValuesFromStream(defaultValueStream))
 } finally {
-  inputStream.close()
+  Option(is).foreach(_.close)
+  isr.foreach(_.close)
 }
+
+if (hardCodedDefaultConfig.isEmpty || (hardCodedDefaultConfig.get.size 
== 0)) {
+  throw new IllegalStateException(sDefault values not found at 
classpath $ClassPathSparkSubmitDefaults)
+}
+
+// Configuration read in from defaults file if it exists
+var sparkDefaultConfig = SparkSubmitArguments.getSparkDefaultFileConfig
+
+if (sparkDefaultConfig.isDefinedAt(SparkPropertiesFile))  {
+  SparkSubmitArguments.getPropertyValuesFromFile(
+  sparkDefaultConfig.get(SparkPropertiesFile).get)
+} else {
+  Map.empty
+}
+
+// Configuration from java system properties
+val systemPropertyConfig = 
SparkSubmitArguments.getPropertyMap(System.getProperties)
+
+// Configuration variables from the environment
+// support legacy variables
+val environmentConfig = System.getenv().asScala
+
+val legacyEnvVars = Seq(MASTER-SparkMaster, 
DEPLOY_MODE-SparkDeployMode,
+  SPARK_DRIVER_MEMORY-SparkDriverMemory, 
SPARK_EXECUTOR_MEMORY-SparkExecutorMemory)
+
+// legacy variables act at the priority of a system property
+val propsWithEnvVars : mutable.Map[String,String] = new 
mutable.HashMap() ++ systemPropertyConfig ++ legacyEnvVars
+  .map( {case(varName, propName) = (environmentConfig.get(varName), 
propName) })
+  .filter( {case(varVariable, _) = varVariable.isDefined  
!varVariable.get.isEmpty} )
+  .map{case(varVariable, propName) = (propName, varVariable.get)}
+
+val ConfigSources  = additionalConfigs ++ Seq (
--- End diff --

changed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have

[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...

2014-09-28 Thread tigerquoll
Github user tigerquoll commented on a diff in the pull request:

https://github.com/apache/spark/pull/2516#discussion_r18134461
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala ---
@@ -406,22 +412,173 @@ private[spark] class SparkSubmitArguments(args: 
Seq[String]) {
   }
 }
 
-object SparkSubmitArguments {
-  /** Load properties present in the given file. */
-  def getPropertiesFromFile(file: File): Seq[(String, String)] = {
-require(file.exists(), sProperties file $file does not exist)
-require(file.isFile(), sProperties file $file is not a normal file)
-val inputStream = new FileInputStream(file)
+private[spark] object SparkSubmitArguments {
+  /**
+   * Resolves Configuration sources in order of highest to lowest
+   * 1. Each map passed in as additionalConfig from first to last
+   * 2. Environment variables (including legacy variable mappings)
+   * 3. System config variables (eg by using -Dspark.var.name)
+   * 4  SPARK_DEFAULT_CONF/spark-defaults.conf or 
SPARK_HOME/conf/spark-defaults.conf
+   * 5. hard coded defaults in class path at spark-submit-defaults.prop
+   *
+   * A property file specified by one of the means listed above gets read 
in and the properties are
+   * considered to be at the priority of the method that specified the 
files.
+   * A property specified in a property file will not override an existing
+   * config value at that same level
+   *
+   * @param additionalConfigs Seq of additional 
Map[ConfigName-ConfigValue] in order of highest
+   *  priority to lowest this will have priority 
over internal sources
+   * @return Map[propName-propFile] containing values merged from all 
sources in order of priority
+   */
+  def mergeSparkProperties(additionalConfigs: Seq [Map[String,String]]) = {
+// Configuration read in from spark-submit-defaults.prop file found on 
the classpath
+var hardCodedDefaultConfig: Option[Map[String,String]] = None
+var is: InputStream = null
+var isr: Option[InputStreamReader] = None
 try {
-  val properties = new Properties()
-  properties.load(inputStream)
-  properties.stringPropertyNames().toSeq.map(k = (k, 
properties(k).trim))
-} catch {
-  case e: IOException =
-val message = sFailed when loading Spark properties file $file
-throw new SparkException(message, e)
+  is = 
Thread.currentThread().getContextClassLoader.getResourceAsStream(ClassPathSparkSubmitDefaults)
+
+  // only open InputStreamReader if InputStream was successfully opened
+  isr = Option(is).map{is: InputStream =
+new InputStreamReader(is, CharEncoding.UTF_8)
+  }
+
+  hardCodedDefaultConfig = isr.map( defaultValueStream =
+
SparkSubmitArguments.getPropertyValuesFromStream(defaultValueStream))
 } finally {
-  inputStream.close()
+  Option(is).foreach(_.close)
+  isr.foreach(_.close)
 }
+
+if (hardCodedDefaultConfig.isEmpty || (hardCodedDefaultConfig.get.size 
== 0)) {
+  throw new IllegalStateException(sDefault values not found at 
classpath $ClassPathSparkSubmitDefaults)
+}
+
+// Configuration read in from defaults file if it exists
+var sparkDefaultConfig = SparkSubmitArguments.getSparkDefaultFileConfig
+
+if (sparkDefaultConfig.isDefinedAt(SparkPropertiesFile))  {
+  SparkSubmitArguments.getPropertyValuesFromFile(
+  sparkDefaultConfig.get(SparkPropertiesFile).get)
+} else {
+  Map.empty
+}
+
+// Configuration from java system properties
+val systemPropertyConfig = 
SparkSubmitArguments.getPropertyMap(System.getProperties)
+
+// Configuration variables from the environment
+// support legacy variables
+val environmentConfig = System.getenv().asScala
+
+val legacyEnvVars = Seq(MASTER-SparkMaster, 
DEPLOY_MODE-SparkDeployMode,
+  SPARK_DRIVER_MEMORY-SparkDriverMemory, 
SPARK_EXECUTOR_MEMORY-SparkExecutorMemory)
+
+// legacy variables act at the priority of a system property
+val propsWithEnvVars : mutable.Map[String,String] = new 
mutable.HashMap() ++ systemPropertyConfig ++ legacyEnvVars
+  .map( {case(varName, propName) = (environmentConfig.get(varName), 
propName) })
+  .filter( {case(varVariable, _) = varVariable.isDefined  
!varVariable.get.isEmpty} )
+  .map{case(varVariable, propName) = (propName, varVariable.get)}
+
+val ConfigSources  = additionalConfigs ++ Seq (
+  environmentConfig,
+  propsWithEnvVars,
+  sparkDefaultConfig,
+  hardCodedDefaultConfig.get
+)
+
+// Load properties file

[GitHub] spark pull request: SPARK-CORE [SPARK-3651] Group common CoarseGra...

2014-09-27 Thread tigerquoll
Github user tigerquoll commented on a diff in the pull request:

https://github.com/apache/spark/pull/2533#discussion_r18126499
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala ---
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+import akka.actor.{Address, ActorRef}
+
+private[cluster] class ExecutorData(
--- End diff --

Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-CORE [SPARK-3651] Group common CoarseGra...

2014-09-27 Thread tigerquoll
Github user tigerquoll commented on a diff in the pull request:

https://github.com/apache/spark/pull/2533#discussion_r18126510
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala ---
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+import akka.actor.{Address, ActorRef}
+
+private[cluster] class ExecutorData(
+   var executorActor: ActorRef,
+   var executorAddress: Address,
+   var executorHost: String ,
+   var freeCores: Int,
+   var totalCores: Int
+) {}
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-CORE [SPARK-3651] Group common CoarseGra...

2014-09-27 Thread tigerquoll
Github user tigerquoll commented on a diff in the pull request:

https://github.com/apache/spark/pull/2533#discussion_r18126545
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -104,13 +96,15 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, actorSystem: A
   case StatusUpdate(executorId, taskId, state, data) =
 scheduler.statusUpdate(taskId, state, data.value)
 if (TaskState.isFinished(state)) {
-  if (executorActor.contains(executorId)) {
-freeCores(executorId) += scheduler.CPUS_PER_TASK
-makeOffers(executorId)
-  } else {
-// Ignoring the update since we don't know about the executor.
-val msg = Ignored task status update (%d state %s) from 
unknown executor %s with ID %s
-logWarning(msg.format(taskId, state, sender, executorId))
+  executorDataMap.get(executorId) match {
+case Some(executorInfo) =
+  executorInfo.freeCores += scheduler.CPUS_PER_TASK
+  makeOffers(executorId)
+case None =
+  // Ignoring the update since we don't know about the 
executor.
+  val msg = Ignored task status update (%d state %s)  +
--- End diff --

Done and replaced format with a 's' interpolated string.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-CORE [SPARK-3651] Group common CoarseGra...

2014-09-27 Thread tigerquoll
Github user tigerquoll commented on a diff in the pull request:

https://github.com/apache/spark/pull/2533#discussion_r18126658
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala ---
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+import akka.actor.{Address, ActorRef}
+
+private[cluster] class ExecutorData(
+   var executorActor: ActorRef,
--- End diff --

Good point - All but freeCores changed to vals


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-CORE [SPARK-3651] Group common CoarseGra...

2014-09-27 Thread tigerquoll
Github user tigerquoll commented on a diff in the pull request:

https://github.com/apache/spark/pull/2533#discussion_r18126663
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -126,8 +120,8 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, actorSystem: A
 
   case StopExecutors =
 logInfo(Asking each executor to shut down)
-for (executor - executorActor.values) {
-  executor ! StopExecutor
+for ((_,executorData) - executorDataMap) {
--- End diff --

Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-CORE [SPARK-3651] Group common CoarseGra...

2014-09-27 Thread tigerquoll
Github user tigerquoll commented on a diff in the pull request:

https://github.com/apache/spark/pull/2533#discussion_r18126665
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -85,16 +79,14 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, actorSystem: A
 def receiveWithLogging = {
   case RegisterExecutor(executorId, hostPort, cores) =
 Utils.checkHostPort(hostPort, Host port expected  + hostPort)
-if (executorActor.contains(executorId)) {
+if (executorDataMap.contains(executorId)) {
   sender ! RegisterExecutorFailed(Duplicate executor ID:  + 
executorId)
 } else {
   logInfo(Registered executor:  + sender +  with ID  + 
executorId)
   sender ! RegisteredExecutor
-  executorActor(executorId) = sender
-  executorHost(executorId) = Utils.parseHostPort(hostPort)._1
-  totalCores(executorId) = cores
-  freeCores(executorId) = cores
-  executorAddress(executorId) = sender.path.address
+  executorDataMap.put(executorId,  new ExecutorData(sender, 
sender.path.address,
--- End diff --

Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-CORE [SPARK-3651] Group common CoarseGra...

2014-09-27 Thread tigerquoll
Github user tigerquoll commented on a diff in the pull request:

https://github.com/apache/spark/pull/2533#discussion_r18126667
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -297,6 +291,7 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, actorSystem: A
   }
 }
 
+
--- End diff --

removed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-CORE [SPARK-3651] Group common CoarseGra...

2014-09-27 Thread tigerquoll
Github user tigerquoll commented on a diff in the pull request:

https://github.com/apache/spark/pull/2533#discussion_r18126669
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -179,25 +176,22 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, actorSystem: A
   }
 }
 else {
-  freeCores(task.executorId) -= scheduler.CPUS_PER_TASK
-  executorActor(task.executorId) ! LaunchTask(new 
SerializableBuffer(serializedTask))
+  val executorInfo = executorDataMap(task.executorId)
+  executorInfo.freeCores -= scheduler.CPUS_PER_TASK
+  executorInfo.executorActor ! LaunchTask(new 
SerializableBuffer(serializedTask))
 }
   }
 }
 
 // Remove a disconnected slave from the cluster
 def removeExecutor(executorId: String, reason: String) {
-  if (executorActor.contains(executorId)) {
-logInfo(Executor  + executorId +  disconnected, so removing it)
-val numCores = totalCores(executorId)
-executorActor -= executorId
-executorHost -= executorId
-addressToExecutorId -= executorAddress(executorId)
-executorAddress -= executorId
-totalCores -= executorId
-freeCores -= executorId
-totalCoreCount.addAndGet(-numCores)
-scheduler.executorLost(executorId, SlaveLost(reason))
+  executorDataMap.get(executorId) match {
+case Some(executorInfo) =
+  val numCores = executorInfo.totalCores
--- End diff --

expression inlined


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-CORE [SPARK-3651] Group common CoarseGra...

2014-09-27 Thread tigerquoll
Github user tigerquoll commented on a diff in the pull request:

https://github.com/apache/spark/pull/2533#discussion_r18126677
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -149,13 +144,15 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, actorSystem: A
 // Make fake resource offers on all executors
 def makeOffers() {
   launchTasks(scheduler.resourceOffers(
-executorHost.toArray.map {case (id, host) = new WorkerOffer(id, 
host, freeCores(id))}))
+executorDataMap.map{ case(id, executorData) =
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-CORE [SPARK-3651] Group common CoarseGra...

2014-09-27 Thread tigerquoll
Github user tigerquoll commented on a diff in the pull request:

https://github.com/apache/spark/pull/2533#discussion_r18126678
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -149,13 +144,15 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, actorSystem: A
 // Make fake resource offers on all executors
 def makeOffers() {
   launchTasks(scheduler.resourceOffers(
-executorHost.toArray.map {case (id, host) = new WorkerOffer(id, 
host, freeCores(id))}))
+executorDataMap.map{ case(id, executorData) =
+  new WorkerOffer( id, executorData.executorHost, 
executorData.freeCores)}.toSeq))
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...

2014-09-25 Thread tigerquoll
Github user tigerquoll commented on the pull request:

https://github.com/apache/spark/pull/2516#issuecomment-56777816
  
vanzin - great feedback. thanks for the effort of going through the code.

I've implemented all the requested changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-CORE [SPARK-3651] Group common CoarseGra...

2014-09-25 Thread tigerquoll
GitHub user tigerquoll opened a pull request:

https://github.com/apache/spark/pull/2533

SPARK-CORE [SPARK-3651] Group common CoarseGrainedSchedulerBackend 
variables together

from [SPARK-3651]
In CoarseGrainedSchedulerBackend, we have:

private val executorActor = new HashMap[String, ActorRef]
private val executorAddress = new HashMap[String, Address]
private val executorHost = new HashMap[String, String]
private val freeCores = new HashMap[String, Int]
private val totalCores = new HashMap[String, Int]

We only ever put / remove stuff from these maps together. It would simplify 
the code if we consolidate these all into one map as we have done in 
JobProgressListener in https://issues.apache.org/jira/browse/SPARK-2299.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tigerquoll/spark-tigerquoll SPARK-3651

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/2533.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2533


commit 7d671cf230bdad22f42f336174e8e0a8f7bc267b
Author: Dale tigerqu...@outlook.com
Date:   2014-09-25T10:46:30Z

[SPARK-3651]  Grouped variables under a ExecutorDataObject, and reference 
them via a map entry as they are all retrieved under the same key




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-CORE [SPARK-3651] Group common CoarseGra...

2014-09-25 Thread tigerquoll
Github user tigerquoll commented on a diff in the pull request:

https://github.com/apache/spark/pull/2533#discussion_r18065148
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -62,15 +62,9 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, actorSystem: A
   val createTime = System.currentTimeMillis()
 
   class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor 
with ActorLogReceive {
-
 override protected def log = CoarseGrainedSchedulerBackend.this.log
-
-private val executorActor = new HashMap[String, ActorRef]
-private val executorAddress = new HashMap[String, Address]
-private val executorHost = new HashMap[String, String]
-private val freeCores = new HashMap[String, Int]
-private val totalCores = new HashMap[String, Int]
 private val addressToExecutorId = new HashMap[Address, String]
+private val executorData = new HashMap[String, ExecutorData]
--- End diff --

changed to
executorDataMap = new HashMap[String, ExecutorData]


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-CORE [SPARK-3651] Group common CoarseGra...

2014-09-25 Thread tigerquoll
Github user tigerquoll commented on a diff in the pull request:

https://github.com/apache/spark/pull/2533#discussion_r18065365
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -85,16 +79,18 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, actorSystem: A
 def receiveWithLogging = {
   case RegisterExecutor(executorId, hostPort, cores) =
 Utils.checkHostPort(hostPort, Host port expected  + hostPort)
-if (executorActor.contains(executorId)) {
+if (executorData.contains(executorId)) {
   sender ! RegisterExecutorFailed(Duplicate executor ID:  + 
executorId)
 } else {
   logInfo(Registered executor:  + sender +  with ID  + 
executorId)
   sender ! RegisteredExecutor
-  executorActor(executorId) = sender
-  executorHost(executorId) = Utils.parseHostPort(hostPort)._1
-  totalCores(executorId) = cores
-  freeCores(executorId) = cores
-  executorAddress(executorId) = sender.path.address
+  executorData.put(executorId, new ExecutorData(
--- End diff --

removed parameter names and re-ordered parameters


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-CORE [SPARK-3651] Group common CoarseGra...

2014-09-25 Thread tigerquoll
Github user tigerquoll commented on a diff in the pull request:

https://github.com/apache/spark/pull/2533#discussion_r18065400
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -104,13 +100,15 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, actorSystem: A
   case StatusUpdate(executorId, taskId, state, data) =
 scheduler.statusUpdate(taskId, state, data.value)
 if (TaskState.isFinished(state)) {
-  if (executorActor.contains(executorId)) {
-freeCores(executorId) += scheduler.CPUS_PER_TASK
-makeOffers(executorId)
-  } else {
-// Ignoring the update since we don't know about the executor.
-val msg = Ignored task status update (%d state %s) from 
unknown executor %s with ID %s
-logWarning(msg.format(taskId, state, sender, executorId))
+  executorData.get(executorId) match {
+case Some(executorInfo) =
--- End diff --

The map is now called ExecutorDataMap


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-CORE [SPARK-3651] Group common CoarseGra...

2014-09-25 Thread tigerquoll
Github user tigerquoll commented on a diff in the pull request:

https://github.com/apache/spark/pull/2533#discussion_r18065662
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -126,8 +124,8 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, actorSystem: A
 
   case StopExecutors =
 logInfo(Asking each executor to shut down)
-for (executor - executorActor.values) {
-  executor ! StopExecutor
+executorData.foreach { case(k,v)  =
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-CORE [SPARK-3651] Group common CoarseGra...

2014-09-25 Thread tigerquoll
Github user tigerquoll commented on a diff in the pull request:

https://github.com/apache/spark/pull/2533#discussion_r18065891
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -149,13 +147,14 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, actorSystem: A
 // Make fake resource offers on all executors
 def makeOffers() {
   launchTasks(scheduler.resourceOffers(
-executorHost.toArray.map {case (id, host) = new WorkerOffer(id, 
host, freeCores(id))}))
+executorData.map{ case(k,v) = new WorkerOffer( k, v.executorHost, 
v.freeCores)}.toSeq))
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-CORE [SPARK-3651] Group common CoarseGra...

2014-09-25 Thread tigerquoll
Github user tigerquoll commented on a diff in the pull request:

https://github.com/apache/spark/pull/2533#discussion_r18066183
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -179,25 +178,22 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, actorSystem: A
   }
 }
 else {
-  freeCores(task.executorId) -= scheduler.CPUS_PER_TASK
-  executorActor(task.executorId) ! LaunchTask(new 
SerializableBuffer(serializedTask))
+  val executorInfo = executorData(task.executorId)
+  executorInfo.freeCores -= scheduler.CPUS_PER_TASK
+  executorInfo.executorActor ! LaunchTask(new 
SerializableBuffer(serializedTask))
 }
   }
 }
 
 // Remove a disconnected slave from the cluster
 def removeExecutor(executorId: String, reason: String) {
-  if (executorActor.contains(executorId)) {
-logInfo(Executor  + executorId +  disconnected, so removing it)
-val numCores = totalCores(executorId)
-executorActor -= executorId
-executorHost -= executorId
-addressToExecutorId -= executorAddress(executorId)
-executorAddress -= executorId
-totalCores -= executorId
-freeCores -= executorId
-totalCoreCount.addAndGet(-numCores)
-scheduler.executorLost(executorId, SlaveLost(reason))
+  executorData.get(executorId) match {
--- End diff --

Pattern matching on an option is a clear and well understood pattern. 
please let clarity rule over conciseness in this case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-CORE [SPARK-3651] Group common CoarseGra...

2014-09-25 Thread tigerquoll
Github user tigerquoll commented on a diff in the pull request:

https://github.com/apache/spark/pull/2533#discussion_r18066186
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -179,25 +178,22 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, actorSystem: A
   }
 }
 else {
-  freeCores(task.executorId) -= scheduler.CPUS_PER_TASK
-  executorActor(task.executorId) ! LaunchTask(new 
SerializableBuffer(serializedTask))
+  val executorInfo = executorData(task.executorId)
+  executorInfo.freeCores -= scheduler.CPUS_PER_TASK
+  executorInfo.executorActor ! LaunchTask(new 
SerializableBuffer(serializedTask))
 }
   }
 }
 
 // Remove a disconnected slave from the cluster
 def removeExecutor(executorId: String, reason: String) {
-  if (executorActor.contains(executorId)) {
-logInfo(Executor  + executorId +  disconnected, so removing it)
-val numCores = totalCores(executorId)
-executorActor -= executorId
-executorHost -= executorId
-addressToExecutorId -= executorAddress(executorId)
-executorAddress -= executorId
-totalCores -= executorId
-freeCores -= executorId
-totalCoreCount.addAndGet(-numCores)
-scheduler.executorLost(executorId, SlaveLost(reason))
+  executorData.get(executorId) match {
+case Some(executorInfo) =
+  val numCores = executorInfo.totalCores
+  executorData.-=(executorId)
--- End diff --

typo fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...

2014-09-25 Thread tigerquoll
Github user tigerquoll commented on a diff in the pull request:

https://github.com/apache/spark/pull/2516#discussion_r18066533
  
--- Diff: 
core/src/main/resources/org/apache/spark/deploy/spark-submit-defaults.prop ---
@@ -0,0 +1,18 @@
+
+spark.master = local[*]
--- End diff --

It could, but if this PR gets accepted I'm thinking of extending the 
concept to other configuration properties as well, many of which have default 
values sitting buried in other parts of the code


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...

2014-09-25 Thread tigerquoll
Github user tigerquoll commented on a diff in the pull request:

https://github.com/apache/spark/pull/2516#discussion_r18066741
  
--- Diff: core/src/main/scala/org/apache/spark/SparkConf.scala ---
@@ -330,13 +330,13 @@ private[spark] object SparkConf {
*/
   def isExecutorStartupConf(name: String): Boolean = {
 isAkkaConf(name) ||
-name.startsWith(spark.akka) ||
-name.startsWith(spark.auth) ||
-isSparkPortConf(name)
+  name.startsWith(spark.akka) ||
+  name.startsWith(spark.auth) ||
+  isSparkPortConf(name)
   }
 
   /**
* Return whether the given config is a Spark port config.
*/
   def isSparkPortConf(name: String): Boolean = name.startsWith(spark.) 
 name.endsWith(.port)
-}
+}
--- End diff --

reverted the file


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...

2014-09-25 Thread tigerquoll
Github user tigerquoll commented on a diff in the pull request:

https://github.com/apache/spark/pull/2516#discussion_r18067374
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala ---
@@ -406,22 +413,166 @@ private[spark] class SparkSubmitArguments(args: 
Seq[String]) {
   }
 }
 
+
 object SparkSubmitArguments {
-  /** Load properties present in the given file. */
-  def getPropertiesFromFile(file: File): Seq[(String, String)] = {
-require(file.exists(), sProperties file $file does not exist)
-require(file.isFile(), sProperties file $file is not a normal file)
-val inputStream = new FileInputStream(file)
-try {
-  val properties = new Properties()
-  properties.load(inputStream)
-  properties.stringPropertyNames().toSeq.map(k = (k, 
properties(k).trim))
-} catch {
-  case e: IOException =
-val message = sFailed when loading Spark properties file $file
-throw new SparkException(message, e)
-} finally {
-  inputStream.close()
+  /**
+   * Resolves Configuration sources in order of highest to lowest
+   * 1. Each map passed in as additionalConfig from first to last
+   * 2. Environment variables (including legacy variable mappings)
+   * 3. System config variables (eg by using -Dspark.var.name)
+   * 4  SPARK_DEFAULT_CONF/spark-defaults.conf or 
SPARK_HOME/conf/spark-defaults.conf
+   * 5. hard coded defaults in class path at spark-submit-defaults.prop
+   *
+   * A property file specified by one of the means listed above gets read 
in and the properties are
+   * considered to be at the priority of the method that specified the 
files.
+   * A property specified in a property file will not override an existing
+   * config value at that same level
+   *
+   * @param additionalConfigs additional Map[ConfigName-ConfigValue] in 
order of highest
+   *  priority to lowest
+   * @return Map[propName-propFile] containing values merged from all 
sources in order of priority
+   */
+  def mergeSparkProperties(additionalConfigs: Vector[Map[String,String]]) 
= {
+
+// Configuration read in from spark-submit-defaults.prop file found on 
the classpath
+val is = Option(Thread.currentThread().getContextClassLoader()
+  .getResourceAsStream(SparkSubmitDefaults))
+
+val hardCodedDefaultConfig = is.flatMap{x =
+  Some(SparkSubmitArguments.getPropertyValuesFromStream(x))}
+
+if (hardCodedDefaultConfig.isEmpty || (hardCodedDefaultConfig.get.size 
== 0)) {
+  throw new IllegalStateException(sDefault values not found at 
classpath $SparkSubmitDefaults)
+}
+
+// Configuration read in from defaults file if it exists
+var sparkDefaultConfig = SparkSubmitArguments.getSparkDefaultFileConfig
+
+if (sparkDefaultConfig.isDefinedAt(SparkPropertiesFile))
+{
+SparkSubmitArguments.getPropertyValuesFromFile(
+  sparkDefaultConfig.get(SparkPropertiesFile).get)
+} else {
+  Map.empty
+}
+
+// Configuration from java system properties
+val systemPropertyConfig = 
SparkSubmitArguments.getPropertyMap(System.getProperties)
+
+// Configuration variables from the environment
+// support legacy variables
+val environmentConfig = System.getenv().asScala
+
+val legacyEnvVars = List(MASTER-SparkMaster, 
DEPLOY_MODE-SparkDeployMode,
+  SPARK_DRIVER_MEMORY-SparkDriverMemory, 
SPARK_EXECUTOR_MEMORY-SparkExecutorMemory)
+
+
+// legacy variables act at the priority of a system property
+systemPropertyConfig ++ legacyEnvVars
+  .map( {case(varName, propName) = (environmentConfig.get(varName), 
propName) })
+  .filter( {case(varVariable, _) = varVariable.isDefined  
!varVariable.get.isEmpty} )
+  .map{case(varVariable, propName) = (propName, varVariable.get)}
+
+val ConfigSources  = additionalConfigs ++ Vector (
+  environmentConfig,
+  systemPropertyConfig,
+  sparkDefaultConfig,
+  hardCodedDefaultConfig.get
+)
+
+// Load properties file at priority level of source that specified the 
property file
+// loaded property file configs will not override existing configs at 
the priority
+// level the property file was specified at
+val processedConfigSource = ConfigSources
+  .map( configMap = getFileBasedPropertiesIfSpecified(configMap) ++ 
configMap)
+
+val test = MergedPropertyMap.mergePropertyMaps(processedConfigSource)
+
+test
+  }
+
+  /**
+   * Returns a map of config values from a property file if
+   * the passed configMap has a SparkPropertiesFile defined pointing

[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...

2014-09-25 Thread tigerquoll
Github user tigerquoll commented on a diff in the pull request:

https://github.com/apache/spark/pull/2516#discussion_r18067408
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala ---
@@ -57,6 +57,10 @@ object SparkSubmit {
   private val CLASS_NOT_FOUND_EXIT_STATUS = 101
 
   // Exposed for testing
+  // testing currently disabled exitFn() from working, so we need to stop 
execution
--- End diff --

Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...

2014-09-25 Thread tigerquoll
Github user tigerquoll commented on a diff in the pull request:

https://github.com/apache/spark/pull/2516#discussion_r18067608
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala ---
@@ -288,11 +291,11 @@ object SparkSubmit {
   }
 
   private def launch(
-  childArgs: ArrayBuffer[String],
-  childClasspath: ArrayBuffer[String],
-  sysProps: Map[String, String],
-  childMainClass: String,
-  verbose: Boolean = false) {
+  childArgs: mutable.ArrayBuffer[String],
--- End diff --

Double checked Intellij's Scala plugin is set to 2 space tabs. I've deleted 
the spaces and re-indented - lets see if that solves it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...

2014-09-25 Thread tigerquoll
Github user tigerquoll commented on a diff in the pull request:

https://github.com/apache/spark/pull/2516#discussion_r18067705
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala ---
@@ -17,155 +17,195 @@
 
 package org.apache.spark.deploy
 
-import java.io.{File, FileInputStream, IOException}
-import java.util.Properties
+import java.io.{InputStreamReader, File, FileInputStream, InputStream}
 import java.util.jar.JarFile
+import java.util.Properties
 
+import scala.collection._
+import scala.collection.JavaConverters._
 import scala.collection.JavaConversions._
-import scala.collection.mutable.{ArrayBuffer, HashMap}
+import org.apache.commons.lang3.CharEncoding
 
-import org.apache.spark.SparkException
+import org.apache.spark.deploy.ConfigConstants._
 import org.apache.spark.util.Utils
 
+
+
 /**
- * Parses and encapsulates arguments from the spark-submit script.
- */
+ * Pulls configuration information together in order of priority
+ *
+ * Entries in the conf Map will be filled in the following priority order
+ * 1. entries specified on the command line (except from --conf entries)
+ * 2. Entries specified on the command line with --conf
+ * 3. Environment variables (including legacy variable mappings)
+ * 4. System config variables (eg by using -Dspark.var.name)
+ * 5  SPARK_DEFAULT_CONF/spark-defaults.conf or 
SPARK_HOME/conf/spark-defaults.conf if either exist
+ * 6. hard coded defaults in class path at spark-submit-defaults.prop
+ *
+ * A property file specified by one of the means listed above gets read in 
and the properties are
+ * considered to be at the priority of the method that specified the 
files. A property specified in
+ * a property file will not override an existing config value at that same 
level
+*/
 private[spark] class SparkSubmitArguments(args: Seq[String]) {
-  var master: String = null
-  var deployMode: String = null
-  var executorMemory: String = null
-  var executorCores: String = null
-  var totalExecutorCores: String = null
-  var propertiesFile: String = null
-  var driverMemory: String = null
-  var driverExtraClassPath: String = null
-  var driverExtraLibraryPath: String = null
-  var driverExtraJavaOptions: String = null
-  var driverCores: String = null
-  var supervise: Boolean = false
-  var queue: String = null
-  var numExecutors: String = null
-  var files: String = null
-  var archives: String = null
-  var mainClass: String = null
-  var primaryResource: String = null
-  var name: String = null
-  var childArgs: ArrayBuffer[String] = new ArrayBuffer[String]()
-  var jars: String = null
-  var verbose: Boolean = false
-  var isPython: Boolean = false
-  var pyFiles: String = null
-  val sparkProperties: HashMap[String, String] = new HashMap[String, 
String]()
-
-  /** Default properties present in the currently defined defaults file. */
-  lazy val defaultSparkProperties: HashMap[String, String] = {
-val defaultProperties = new HashMap[String, String]()
-if (verbose) SparkSubmit.printStream.println(sUsing properties file: 
$propertiesFile)
-Option(propertiesFile).foreach { filename =
-  val file = new File(filename)
-  SparkSubmitArguments.getPropertiesFromFile(file).foreach { case (k, 
v) =
-if (k.startsWith(spark)) {
-  defaultProperties(k) = v
-  if (verbose) SparkSubmit.printStream.println(sAdding default 
property: $k=$v)
-} else {
-  SparkSubmit.printWarning(sIgnoring non-spark config property: 
$k=$v)
-}
-  }
-}
-defaultProperties
-  }
+  /**
+   * Stores all configuration items except for child arguments,
+   * referenced by the constants defined in ConfigConstants.scala
+   */
+  val conf = new mutable.HashMap[String, String]()
+
+  def master  = conf(SparkMaster)
+  def master_= (value: String):Unit = conf.put(SparkMaster, value)
+
+  def deployMode = conf(SparkDeployMode)
+  def deployMode_= (value: String):Unit = conf.put(SparkDeployMode, value)
+
+  def executorMemory = conf(SparkExecutorMemory)
+  def executorMemory_= (value: String):Unit = 
conf.put(SparkExecutorMemory, value)
+
+  def executorCores = conf(SparkExecutorCores)
+  def executorCores_= (value: String):Unit = conf.put(SparkExecutorCores, 
value)
+
+  def totalExecutorCores = conf.get(SparkCoresMax)
+  def totalExecutorCores_= (value: String):Unit = conf.put(SparkCoresMax, 
value)
+
+  def driverMemory = conf(SparkDriverMemory)
+  def driverMemory_= (value: String):Unit = conf.put(SparkDriverMemory, 
value)
+
+  def driverExtraClassPath

[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...

2014-09-25 Thread tigerquoll
Github user tigerquoll commented on a diff in the pull request:

https://github.com/apache/spark/pull/2516#discussion_r18067718
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala ---
@@ -17,155 +17,195 @@
 
 package org.apache.spark.deploy
 
-import java.io.{File, FileInputStream, IOException}
-import java.util.Properties
+import java.io.{InputStreamReader, File, FileInputStream, InputStream}
 import java.util.jar.JarFile
+import java.util.Properties
 
+import scala.collection._
+import scala.collection.JavaConverters._
 import scala.collection.JavaConversions._
-import scala.collection.mutable.{ArrayBuffer, HashMap}
+import org.apache.commons.lang3.CharEncoding
 
-import org.apache.spark.SparkException
+import org.apache.spark.deploy.ConfigConstants._
 import org.apache.spark.util.Utils
 
+
+
 /**
- * Parses and encapsulates arguments from the spark-submit script.
- */
+ * Pulls configuration information together in order of priority
+ *
+ * Entries in the conf Map will be filled in the following priority order
+ * 1. entries specified on the command line (except from --conf entries)
+ * 2. Entries specified on the command line with --conf
+ * 3. Environment variables (including legacy variable mappings)
+ * 4. System config variables (eg by using -Dspark.var.name)
+ * 5  SPARK_DEFAULT_CONF/spark-defaults.conf or 
SPARK_HOME/conf/spark-defaults.conf if either exist
+ * 6. hard coded defaults in class path at spark-submit-defaults.prop
+ *
+ * A property file specified by one of the means listed above gets read in 
and the properties are
+ * considered to be at the priority of the method that specified the 
files. A property specified in
+ * a property file will not override an existing config value at that same 
level
+*/
 private[spark] class SparkSubmitArguments(args: Seq[String]) {
-  var master: String = null
-  var deployMode: String = null
-  var executorMemory: String = null
-  var executorCores: String = null
-  var totalExecutorCores: String = null
-  var propertiesFile: String = null
-  var driverMemory: String = null
-  var driverExtraClassPath: String = null
-  var driverExtraLibraryPath: String = null
-  var driverExtraJavaOptions: String = null
-  var driverCores: String = null
-  var supervise: Boolean = false
-  var queue: String = null
-  var numExecutors: String = null
-  var files: String = null
-  var archives: String = null
-  var mainClass: String = null
-  var primaryResource: String = null
-  var name: String = null
-  var childArgs: ArrayBuffer[String] = new ArrayBuffer[String]()
-  var jars: String = null
-  var verbose: Boolean = false
-  var isPython: Boolean = false
-  var pyFiles: String = null
-  val sparkProperties: HashMap[String, String] = new HashMap[String, 
String]()
-
-  /** Default properties present in the currently defined defaults file. */
-  lazy val defaultSparkProperties: HashMap[String, String] = {
-val defaultProperties = new HashMap[String, String]()
-if (verbose) SparkSubmit.printStream.println(sUsing properties file: 
$propertiesFile)
-Option(propertiesFile).foreach { filename =
-  val file = new File(filename)
-  SparkSubmitArguments.getPropertiesFromFile(file).foreach { case (k, 
v) =
-if (k.startsWith(spark)) {
-  defaultProperties(k) = v
-  if (verbose) SparkSubmit.printStream.println(sAdding default 
property: $k=$v)
-} else {
-  SparkSubmit.printWarning(sIgnoring non-spark config property: 
$k=$v)
-}
-  }
-}
-defaultProperties
-  }
+  /**
+   * Stores all configuration items except for child arguments,
+   * referenced by the constants defined in ConfigConstants.scala
+   */
+  val conf = new mutable.HashMap[String, String]()
+
+  def master  = conf(SparkMaster)
+  def master_= (value: String):Unit = conf.put(SparkMaster, value)
+
+  def deployMode = conf(SparkDeployMode)
+  def deployMode_= (value: String):Unit = conf.put(SparkDeployMode, value)
+
+  def executorMemory = conf(SparkExecutorMemory)
+  def executorMemory_= (value: String):Unit = 
conf.put(SparkExecutorMemory, value)
+
+  def executorCores = conf(SparkExecutorCores)
+  def executorCores_= (value: String):Unit = conf.put(SparkExecutorCores, 
value)
+
+  def totalExecutorCores = conf.get(SparkCoresMax)
+  def totalExecutorCores_= (value: String):Unit = conf.put(SparkCoresMax, 
value)
+
+  def driverMemory = conf(SparkDriverMemory)
+  def driverMemory_= (value: String):Unit = conf.put(SparkDriverMemory, 
value)
+
+  def driverExtraClassPath

[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...

2014-09-25 Thread tigerquoll
Github user tigerquoll commented on a diff in the pull request:

https://github.com/apache/spark/pull/2516#discussion_r18067811
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala ---
@@ -17,155 +17,195 @@
 
 package org.apache.spark.deploy
 
-import java.io.{File, FileInputStream, IOException}
-import java.util.Properties
+import java.io.{InputStreamReader, File, FileInputStream, InputStream}
 import java.util.jar.JarFile
+import java.util.Properties
 
+import scala.collection._
+import scala.collection.JavaConverters._
 import scala.collection.JavaConversions._
-import scala.collection.mutable.{ArrayBuffer, HashMap}
+import org.apache.commons.lang3.CharEncoding
 
-import org.apache.spark.SparkException
+import org.apache.spark.deploy.ConfigConstants._
 import org.apache.spark.util.Utils
 
+
+
 /**
- * Parses and encapsulates arguments from the spark-submit script.
- */
+ * Pulls configuration information together in order of priority
+ *
+ * Entries in the conf Map will be filled in the following priority order
+ * 1. entries specified on the command line (except from --conf entries)
+ * 2. Entries specified on the command line with --conf
+ * 3. Environment variables (including legacy variable mappings)
+ * 4. System config variables (eg by using -Dspark.var.name)
+ * 5  SPARK_DEFAULT_CONF/spark-defaults.conf or 
SPARK_HOME/conf/spark-defaults.conf if either exist
+ * 6. hard coded defaults in class path at spark-submit-defaults.prop
+ *
+ * A property file specified by one of the means listed above gets read in 
and the properties are
+ * considered to be at the priority of the method that specified the 
files. A property specified in
+ * a property file will not override an existing config value at that same 
level
+*/
 private[spark] class SparkSubmitArguments(args: Seq[String]) {
-  var master: String = null
-  var deployMode: String = null
-  var executorMemory: String = null
-  var executorCores: String = null
-  var totalExecutorCores: String = null
-  var propertiesFile: String = null
-  var driverMemory: String = null
-  var driverExtraClassPath: String = null
-  var driverExtraLibraryPath: String = null
-  var driverExtraJavaOptions: String = null
-  var driverCores: String = null
-  var supervise: Boolean = false
-  var queue: String = null
-  var numExecutors: String = null
-  var files: String = null
-  var archives: String = null
-  var mainClass: String = null
-  var primaryResource: String = null
-  var name: String = null
-  var childArgs: ArrayBuffer[String] = new ArrayBuffer[String]()
-  var jars: String = null
-  var verbose: Boolean = false
-  var isPython: Boolean = false
-  var pyFiles: String = null
-  val sparkProperties: HashMap[String, String] = new HashMap[String, 
String]()
-
-  /** Default properties present in the currently defined defaults file. */
-  lazy val defaultSparkProperties: HashMap[String, String] = {
-val defaultProperties = new HashMap[String, String]()
-if (verbose) SparkSubmit.printStream.println(sUsing properties file: 
$propertiesFile)
-Option(propertiesFile).foreach { filename =
-  val file = new File(filename)
-  SparkSubmitArguments.getPropertiesFromFile(file).foreach { case (k, 
v) =
-if (k.startsWith(spark)) {
-  defaultProperties(k) = v
-  if (verbose) SparkSubmit.printStream.println(sAdding default 
property: $k=$v)
-} else {
-  SparkSubmit.printWarning(sIgnoring non-spark config property: 
$k=$v)
-}
-  }
-}
-defaultProperties
-  }
+  /**
+   * Stores all configuration items except for child arguments,
+   * referenced by the constants defined in ConfigConstants.scala
+   */
+  val conf = new mutable.HashMap[String, String]()
+
+  def master  = conf(SparkMaster)
+  def master_= (value: String):Unit = conf.put(SparkMaster, value)
+
+  def deployMode = conf(SparkDeployMode)
+  def deployMode_= (value: String):Unit = conf.put(SparkDeployMode, value)
+
+  def executorMemory = conf(SparkExecutorMemory)
+  def executorMemory_= (value: String):Unit = 
conf.put(SparkExecutorMemory, value)
+
+  def executorCores = conf(SparkExecutorCores)
+  def executorCores_= (value: String):Unit = conf.put(SparkExecutorCores, 
value)
+
+  def totalExecutorCores = conf.get(SparkCoresMax)
+  def totalExecutorCores_= (value: String):Unit = conf.put(SparkCoresMax, 
value)
+
+  def driverMemory = conf(SparkDriverMemory)
+  def driverMemory_= (value: String):Unit = conf.put(SparkDriverMemory, 
value)
+
+  def driverExtraClassPath

[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...

2014-09-25 Thread tigerquoll
Github user tigerquoll commented on a diff in the pull request:

https://github.com/apache/spark/pull/2516#discussion_r18067818
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala ---
@@ -17,155 +17,195 @@
 
 package org.apache.spark.deploy
 
-import java.io.{File, FileInputStream, IOException}
-import java.util.Properties
+import java.io.{InputStreamReader, File, FileInputStream, InputStream}
 import java.util.jar.JarFile
+import java.util.Properties
 
+import scala.collection._
+import scala.collection.JavaConverters._
 import scala.collection.JavaConversions._
-import scala.collection.mutable.{ArrayBuffer, HashMap}
+import org.apache.commons.lang3.CharEncoding
 
-import org.apache.spark.SparkException
+import org.apache.spark.deploy.ConfigConstants._
 import org.apache.spark.util.Utils
 
+
+
 /**
- * Parses and encapsulates arguments from the spark-submit script.
- */
+ * Pulls configuration information together in order of priority
+ *
+ * Entries in the conf Map will be filled in the following priority order
+ * 1. entries specified on the command line (except from --conf entries)
+ * 2. Entries specified on the command line with --conf
+ * 3. Environment variables (including legacy variable mappings)
+ * 4. System config variables (eg by using -Dspark.var.name)
+ * 5  SPARK_DEFAULT_CONF/spark-defaults.conf or 
SPARK_HOME/conf/spark-defaults.conf if either exist
+ * 6. hard coded defaults in class path at spark-submit-defaults.prop
+ *
+ * A property file specified by one of the means listed above gets read in 
and the properties are
+ * considered to be at the priority of the method that specified the 
files. A property specified in
+ * a property file will not override an existing config value at that same 
level
+*/
 private[spark] class SparkSubmitArguments(args: Seq[String]) {
-  var master: String = null
-  var deployMode: String = null
-  var executorMemory: String = null
-  var executorCores: String = null
-  var totalExecutorCores: String = null
-  var propertiesFile: String = null
-  var driverMemory: String = null
-  var driverExtraClassPath: String = null
-  var driverExtraLibraryPath: String = null
-  var driverExtraJavaOptions: String = null
-  var driverCores: String = null
-  var supervise: Boolean = false
-  var queue: String = null
-  var numExecutors: String = null
-  var files: String = null
-  var archives: String = null
-  var mainClass: String = null
-  var primaryResource: String = null
-  var name: String = null
-  var childArgs: ArrayBuffer[String] = new ArrayBuffer[String]()
-  var jars: String = null
-  var verbose: Boolean = false
-  var isPython: Boolean = false
-  var pyFiles: String = null
-  val sparkProperties: HashMap[String, String] = new HashMap[String, 
String]()
-
-  /** Default properties present in the currently defined defaults file. */
-  lazy val defaultSparkProperties: HashMap[String, String] = {
-val defaultProperties = new HashMap[String, String]()
-if (verbose) SparkSubmit.printStream.println(sUsing properties file: 
$propertiesFile)
-Option(propertiesFile).foreach { filename =
-  val file = new File(filename)
-  SparkSubmitArguments.getPropertiesFromFile(file).foreach { case (k, 
v) =
-if (k.startsWith(spark)) {
-  defaultProperties(k) = v
-  if (verbose) SparkSubmit.printStream.println(sAdding default 
property: $k=$v)
-} else {
-  SparkSubmit.printWarning(sIgnoring non-spark config property: 
$k=$v)
-}
-  }
-}
-defaultProperties
-  }
+  /**
+   * Stores all configuration items except for child arguments,
+   * referenced by the constants defined in ConfigConstants.scala
+   */
+  val conf = new mutable.HashMap[String, String]()
+
+  def master  = conf(SparkMaster)
+  def master_= (value: String):Unit = conf.put(SparkMaster, value)
+
+  def deployMode = conf(SparkDeployMode)
+  def deployMode_= (value: String):Unit = conf.put(SparkDeployMode, value)
+
+  def executorMemory = conf(SparkExecutorMemory)
+  def executorMemory_= (value: String):Unit = 
conf.put(SparkExecutorMemory, value)
+
+  def executorCores = conf(SparkExecutorCores)
+  def executorCores_= (value: String):Unit = conf.put(SparkExecutorCores, 
value)
+
+  def totalExecutorCores = conf.get(SparkCoresMax)
+  def totalExecutorCores_= (value: String):Unit = conf.put(SparkCoresMax, 
value)
+
+  def driverMemory = conf(SparkDriverMemory)
+  def driverMemory_= (value: String):Unit = conf.put(SparkDriverMemory, 
value)
+
+  def driverExtraClassPath

[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...

2014-09-25 Thread tigerquoll
Github user tigerquoll commented on a diff in the pull request:

https://github.com/apache/spark/pull/2516#discussion_r18068019
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala ---
@@ -17,155 +17,195 @@
 
 package org.apache.spark.deploy
 
-import java.io.{File, FileInputStream, IOException}
-import java.util.Properties
+import java.io.{InputStreamReader, File, FileInputStream, InputStream}
 import java.util.jar.JarFile
+import java.util.Properties
 
+import scala.collection._
+import scala.collection.JavaConverters._
 import scala.collection.JavaConversions._
-import scala.collection.mutable.{ArrayBuffer, HashMap}
+import org.apache.commons.lang3.CharEncoding
 
-import org.apache.spark.SparkException
+import org.apache.spark.deploy.ConfigConstants._
 import org.apache.spark.util.Utils
 
+
+
 /**
- * Parses and encapsulates arguments from the spark-submit script.
- */
+ * Pulls configuration information together in order of priority
+ *
+ * Entries in the conf Map will be filled in the following priority order
+ * 1. entries specified on the command line (except from --conf entries)
+ * 2. Entries specified on the command line with --conf
+ * 3. Environment variables (including legacy variable mappings)
+ * 4. System config variables (eg by using -Dspark.var.name)
+ * 5  SPARK_DEFAULT_CONF/spark-defaults.conf or 
SPARK_HOME/conf/spark-defaults.conf if either exist
+ * 6. hard coded defaults in class path at spark-submit-defaults.prop
+ *
+ * A property file specified by one of the means listed above gets read in 
and the properties are
+ * considered to be at the priority of the method that specified the 
files. A property specified in
+ * a property file will not override an existing config value at that same 
level
+*/
 private[spark] class SparkSubmitArguments(args: Seq[String]) {
-  var master: String = null
-  var deployMode: String = null
-  var executorMemory: String = null
-  var executorCores: String = null
-  var totalExecutorCores: String = null
-  var propertiesFile: String = null
-  var driverMemory: String = null
-  var driverExtraClassPath: String = null
-  var driverExtraLibraryPath: String = null
-  var driverExtraJavaOptions: String = null
-  var driverCores: String = null
-  var supervise: Boolean = false
-  var queue: String = null
-  var numExecutors: String = null
-  var files: String = null
-  var archives: String = null
-  var mainClass: String = null
-  var primaryResource: String = null
-  var name: String = null
-  var childArgs: ArrayBuffer[String] = new ArrayBuffer[String]()
-  var jars: String = null
-  var verbose: Boolean = false
-  var isPython: Boolean = false
-  var pyFiles: String = null
-  val sparkProperties: HashMap[String, String] = new HashMap[String, 
String]()
-
-  /** Default properties present in the currently defined defaults file. */
-  lazy val defaultSparkProperties: HashMap[String, String] = {
-val defaultProperties = new HashMap[String, String]()
-if (verbose) SparkSubmit.printStream.println(sUsing properties file: 
$propertiesFile)
-Option(propertiesFile).foreach { filename =
-  val file = new File(filename)
-  SparkSubmitArguments.getPropertiesFromFile(file).foreach { case (k, 
v) =
-if (k.startsWith(spark)) {
-  defaultProperties(k) = v
-  if (verbose) SparkSubmit.printStream.println(sAdding default 
property: $k=$v)
-} else {
-  SparkSubmit.printWarning(sIgnoring non-spark config property: 
$k=$v)
-}
-  }
-}
-defaultProperties
-  }
+  /**
+   * Stores all configuration items except for child arguments,
+   * referenced by the constants defined in ConfigConstants.scala
+   */
+  val conf = new mutable.HashMap[String, String]()
+
+  def master  = conf(SparkMaster)
+  def master_= (value: String):Unit = conf.put(SparkMaster, value)
+
+  def deployMode = conf(SparkDeployMode)
+  def deployMode_= (value: String):Unit = conf.put(SparkDeployMode, value)
+
+  def executorMemory = conf(SparkExecutorMemory)
+  def executorMemory_= (value: String):Unit = 
conf.put(SparkExecutorMemory, value)
+
+  def executorCores = conf(SparkExecutorCores)
+  def executorCores_= (value: String):Unit = conf.put(SparkExecutorCores, 
value)
+
+  def totalExecutorCores = conf.get(SparkCoresMax)
+  def totalExecutorCores_= (value: String):Unit = conf.put(SparkCoresMax, 
value)
+
+  def driverMemory = conf(SparkDriverMemory)
+  def driverMemory_= (value: String):Unit = conf.put(SparkDriverMemory, 
value)
+
+  def driverExtraClassPath

[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...

2014-09-25 Thread tigerquoll
Github user tigerquoll commented on a diff in the pull request:

https://github.com/apache/spark/pull/2516#discussion_r18068346
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala ---
@@ -17,155 +17,195 @@
 
 package org.apache.spark.deploy
 
-import java.io.{File, FileInputStream, IOException}
-import java.util.Properties
+import java.io.{InputStreamReader, File, FileInputStream, InputStream}
 import java.util.jar.JarFile
+import java.util.Properties
 
+import scala.collection._
+import scala.collection.JavaConverters._
 import scala.collection.JavaConversions._
-import scala.collection.mutable.{ArrayBuffer, HashMap}
+import org.apache.commons.lang3.CharEncoding
 
-import org.apache.spark.SparkException
+import org.apache.spark.deploy.ConfigConstants._
 import org.apache.spark.util.Utils
 
+
+
 /**
- * Parses and encapsulates arguments from the spark-submit script.
- */
+ * Pulls configuration information together in order of priority
+ *
+ * Entries in the conf Map will be filled in the following priority order
+ * 1. entries specified on the command line (except from --conf entries)
+ * 2. Entries specified on the command line with --conf
+ * 3. Environment variables (including legacy variable mappings)
+ * 4. System config variables (eg by using -Dspark.var.name)
+ * 5  SPARK_DEFAULT_CONF/spark-defaults.conf or 
SPARK_HOME/conf/spark-defaults.conf if either exist
+ * 6. hard coded defaults in class path at spark-submit-defaults.prop
+ *
+ * A property file specified by one of the means listed above gets read in 
and the properties are
+ * considered to be at the priority of the method that specified the 
files. A property specified in
+ * a property file will not override an existing config value at that same 
level
+*/
 private[spark] class SparkSubmitArguments(args: Seq[String]) {
-  var master: String = null
-  var deployMode: String = null
-  var executorMemory: String = null
-  var executorCores: String = null
-  var totalExecutorCores: String = null
-  var propertiesFile: String = null
-  var driverMemory: String = null
-  var driverExtraClassPath: String = null
-  var driverExtraLibraryPath: String = null
-  var driverExtraJavaOptions: String = null
-  var driverCores: String = null
-  var supervise: Boolean = false
-  var queue: String = null
-  var numExecutors: String = null
-  var files: String = null
-  var archives: String = null
-  var mainClass: String = null
-  var primaryResource: String = null
-  var name: String = null
-  var childArgs: ArrayBuffer[String] = new ArrayBuffer[String]()
-  var jars: String = null
-  var verbose: Boolean = false
-  var isPython: Boolean = false
-  var pyFiles: String = null
-  val sparkProperties: HashMap[String, String] = new HashMap[String, 
String]()
-
-  /** Default properties present in the currently defined defaults file. */
-  lazy val defaultSparkProperties: HashMap[String, String] = {
-val defaultProperties = new HashMap[String, String]()
-if (verbose) SparkSubmit.printStream.println(sUsing properties file: 
$propertiesFile)
-Option(propertiesFile).foreach { filename =
-  val file = new File(filename)
-  SparkSubmitArguments.getPropertiesFromFile(file).foreach { case (k, 
v) =
-if (k.startsWith(spark)) {
-  defaultProperties(k) = v
-  if (verbose) SparkSubmit.printStream.println(sAdding default 
property: $k=$v)
-} else {
-  SparkSubmit.printWarning(sIgnoring non-spark config property: 
$k=$v)
-}
-  }
-}
-defaultProperties
-  }
+  /**
+   * Stores all configuration items except for child arguments,
+   * referenced by the constants defined in ConfigConstants.scala
+   */
+  val conf = new mutable.HashMap[String, String]()
+
+  def master  = conf(SparkMaster)
+  def master_= (value: String):Unit = conf.put(SparkMaster, value)
+
+  def deployMode = conf(SparkDeployMode)
+  def deployMode_= (value: String):Unit = conf.put(SparkDeployMode, value)
+
+  def executorMemory = conf(SparkExecutorMemory)
+  def executorMemory_= (value: String):Unit = 
conf.put(SparkExecutorMemory, value)
+
+  def executorCores = conf(SparkExecutorCores)
+  def executorCores_= (value: String):Unit = conf.put(SparkExecutorCores, 
value)
+
+  def totalExecutorCores = conf.get(SparkCoresMax)
+  def totalExecutorCores_= (value: String):Unit = conf.put(SparkCoresMax, 
value)
+
+  def driverMemory = conf(SparkDriverMemory)
+  def driverMemory_= (value: String):Unit = conf.put(SparkDriverMemory, 
value)
+
+  def driverExtraClassPath

[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...

2014-09-25 Thread tigerquoll
Github user tigerquoll commented on a diff in the pull request:

https://github.com/apache/spark/pull/2516#discussion_r18068465
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala ---
@@ -17,155 +17,195 @@
 
 package org.apache.spark.deploy
 
-import java.io.{File, FileInputStream, IOException}
-import java.util.Properties
+import java.io.{InputStreamReader, File, FileInputStream, InputStream}
 import java.util.jar.JarFile
+import java.util.Properties
 
+import scala.collection._
+import scala.collection.JavaConverters._
 import scala.collection.JavaConversions._
-import scala.collection.mutable.{ArrayBuffer, HashMap}
+import org.apache.commons.lang3.CharEncoding
 
-import org.apache.spark.SparkException
+import org.apache.spark.deploy.ConfigConstants._
 import org.apache.spark.util.Utils
 
+
+
 /**
- * Parses and encapsulates arguments from the spark-submit script.
- */
+ * Pulls configuration information together in order of priority
+ *
+ * Entries in the conf Map will be filled in the following priority order
+ * 1. entries specified on the command line (except from --conf entries)
+ * 2. Entries specified on the command line with --conf
+ * 3. Environment variables (including legacy variable mappings)
+ * 4. System config variables (eg by using -Dspark.var.name)
+ * 5  SPARK_DEFAULT_CONF/spark-defaults.conf or 
SPARK_HOME/conf/spark-defaults.conf if either exist
+ * 6. hard coded defaults in class path at spark-submit-defaults.prop
+ *
+ * A property file specified by one of the means listed above gets read in 
and the properties are
+ * considered to be at the priority of the method that specified the 
files. A property specified in
+ * a property file will not override an existing config value at that same 
level
+*/
 private[spark] class SparkSubmitArguments(args: Seq[String]) {
-  var master: String = null
-  var deployMode: String = null
-  var executorMemory: String = null
-  var executorCores: String = null
-  var totalExecutorCores: String = null
-  var propertiesFile: String = null
-  var driverMemory: String = null
-  var driverExtraClassPath: String = null
-  var driverExtraLibraryPath: String = null
-  var driverExtraJavaOptions: String = null
-  var driverCores: String = null
-  var supervise: Boolean = false
-  var queue: String = null
-  var numExecutors: String = null
-  var files: String = null
-  var archives: String = null
-  var mainClass: String = null
-  var primaryResource: String = null
-  var name: String = null
-  var childArgs: ArrayBuffer[String] = new ArrayBuffer[String]()
-  var jars: String = null
-  var verbose: Boolean = false
-  var isPython: Boolean = false
-  var pyFiles: String = null
-  val sparkProperties: HashMap[String, String] = new HashMap[String, 
String]()
-
-  /** Default properties present in the currently defined defaults file. */
-  lazy val defaultSparkProperties: HashMap[String, String] = {
-val defaultProperties = new HashMap[String, String]()
-if (verbose) SparkSubmit.printStream.println(sUsing properties file: 
$propertiesFile)
-Option(propertiesFile).foreach { filename =
-  val file = new File(filename)
-  SparkSubmitArguments.getPropertiesFromFile(file).foreach { case (k, 
v) =
-if (k.startsWith(spark)) {
-  defaultProperties(k) = v
-  if (verbose) SparkSubmit.printStream.println(sAdding default 
property: $k=$v)
-} else {
-  SparkSubmit.printWarning(sIgnoring non-spark config property: 
$k=$v)
-}
-  }
-}
-defaultProperties
-  }
+  /**
+   * Stores all configuration items except for child arguments,
+   * referenced by the constants defined in ConfigConstants.scala
+   */
+  val conf = new mutable.HashMap[String, String]()
+
+  def master  = conf(SparkMaster)
+  def master_= (value: String):Unit = conf.put(SparkMaster, value)
+
+  def deployMode = conf(SparkDeployMode)
+  def deployMode_= (value: String):Unit = conf.put(SparkDeployMode, value)
+
+  def executorMemory = conf(SparkExecutorMemory)
+  def executorMemory_= (value: String):Unit = 
conf.put(SparkExecutorMemory, value)
+
+  def executorCores = conf(SparkExecutorCores)
+  def executorCores_= (value: String):Unit = conf.put(SparkExecutorCores, 
value)
+
+  def totalExecutorCores = conf.get(SparkCoresMax)
+  def totalExecutorCores_= (value: String):Unit = conf.put(SparkCoresMax, 
value)
+
+  def driverMemory = conf(SparkDriverMemory)
+  def driverMemory_= (value: String):Unit = conf.put(SparkDriverMemory, 
value)
+
+  def driverExtraClassPath

[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...

2014-09-25 Thread tigerquoll
Github user tigerquoll commented on a diff in the pull request:

https://github.com/apache/spark/pull/2516#discussion_r18068498
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala ---
@@ -17,155 +17,195 @@
 
 package org.apache.spark.deploy
 
-import java.io.{File, FileInputStream, IOException}
-import java.util.Properties
+import java.io.{InputStreamReader, File, FileInputStream, InputStream}
 import java.util.jar.JarFile
+import java.util.Properties
 
+import scala.collection._
+import scala.collection.JavaConverters._
 import scala.collection.JavaConversions._
-import scala.collection.mutable.{ArrayBuffer, HashMap}
+import org.apache.commons.lang3.CharEncoding
 
-import org.apache.spark.SparkException
+import org.apache.spark.deploy.ConfigConstants._
 import org.apache.spark.util.Utils
 
+
+
 /**
- * Parses and encapsulates arguments from the spark-submit script.
- */
+ * Pulls configuration information together in order of priority
+ *
+ * Entries in the conf Map will be filled in the following priority order
+ * 1. entries specified on the command line (except from --conf entries)
+ * 2. Entries specified on the command line with --conf
+ * 3. Environment variables (including legacy variable mappings)
+ * 4. System config variables (eg by using -Dspark.var.name)
+ * 5  SPARK_DEFAULT_CONF/spark-defaults.conf or 
SPARK_HOME/conf/spark-defaults.conf if either exist
+ * 6. hard coded defaults in class path at spark-submit-defaults.prop
+ *
+ * A property file specified by one of the means listed above gets read in 
and the properties are
+ * considered to be at the priority of the method that specified the 
files. A property specified in
+ * a property file will not override an existing config value at that same 
level
+*/
 private[spark] class SparkSubmitArguments(args: Seq[String]) {
-  var master: String = null
-  var deployMode: String = null
-  var executorMemory: String = null
-  var executorCores: String = null
-  var totalExecutorCores: String = null
-  var propertiesFile: String = null
-  var driverMemory: String = null
-  var driverExtraClassPath: String = null
-  var driverExtraLibraryPath: String = null
-  var driverExtraJavaOptions: String = null
-  var driverCores: String = null
-  var supervise: Boolean = false
-  var queue: String = null
-  var numExecutors: String = null
-  var files: String = null
-  var archives: String = null
-  var mainClass: String = null
-  var primaryResource: String = null
-  var name: String = null
-  var childArgs: ArrayBuffer[String] = new ArrayBuffer[String]()
-  var jars: String = null
-  var verbose: Boolean = false
-  var isPython: Boolean = false
-  var pyFiles: String = null
-  val sparkProperties: HashMap[String, String] = new HashMap[String, 
String]()
-
-  /** Default properties present in the currently defined defaults file. */
-  lazy val defaultSparkProperties: HashMap[String, String] = {
-val defaultProperties = new HashMap[String, String]()
-if (verbose) SparkSubmit.printStream.println(sUsing properties file: 
$propertiesFile)
-Option(propertiesFile).foreach { filename =
-  val file = new File(filename)
-  SparkSubmitArguments.getPropertiesFromFile(file).foreach { case (k, 
v) =
-if (k.startsWith(spark)) {
-  defaultProperties(k) = v
-  if (verbose) SparkSubmit.printStream.println(sAdding default 
property: $k=$v)
-} else {
-  SparkSubmit.printWarning(sIgnoring non-spark config property: 
$k=$v)
-}
-  }
-}
-defaultProperties
-  }
+  /**
+   * Stores all configuration items except for child arguments,
+   * referenced by the constants defined in ConfigConstants.scala
+   */
+  val conf = new mutable.HashMap[String, String]()
+
+  def master  = conf(SparkMaster)
+  def master_= (value: String):Unit = conf.put(SparkMaster, value)
+
+  def deployMode = conf(SparkDeployMode)
+  def deployMode_= (value: String):Unit = conf.put(SparkDeployMode, value)
+
+  def executorMemory = conf(SparkExecutorMemory)
+  def executorMemory_= (value: String):Unit = 
conf.put(SparkExecutorMemory, value)
+
+  def executorCores = conf(SparkExecutorCores)
+  def executorCores_= (value: String):Unit = conf.put(SparkExecutorCores, 
value)
+
+  def totalExecutorCores = conf.get(SparkCoresMax)
+  def totalExecutorCores_= (value: String):Unit = conf.put(SparkCoresMax, 
value)
+
+  def driverMemory = conf(SparkDriverMemory)
+  def driverMemory_= (value: String):Unit = conf.put(SparkDriverMemory, 
value)
+
+  def driverExtraClassPath

[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...

2014-09-25 Thread tigerquoll
Github user tigerquoll commented on a diff in the pull request:

https://github.com/apache/spark/pull/2516#discussion_r18068639
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala ---
@@ -188,41 +228,16 @@ private[spark] class SparkSubmitArguments(args: 
Seq[String]) {
   either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the 
environment.)
   }
 }
+
   }
 
   override def toString =  {
-sParsed arguments:
-|  master  $master
-|  deployMode  $deployMode
-|  executorMemory  $executorMemory
-|  executorCores   $executorCores
-|  totalExecutorCores  $totalExecutorCores
-|  propertiesFile  $propertiesFile
-|  extraSparkProperties$sparkProperties
-|  driverMemory$driverMemory
-|  driverCores $driverCores
-|  driverExtraClassPath$driverExtraClassPath
-|  driverExtraLibraryPath  $driverExtraLibraryPath
-|  driverExtraJavaOptions  $driverExtraJavaOptions
-|  supervise   $supervise
-|  queue   $queue
-|  numExecutors$numExecutors
-|  files   $files
-|  pyFiles $pyFiles
-|  archives$archives
-|  mainClass   $mainClass
-|  primaryResource $primaryResource
-|  name$name
-|  childArgs   [${childArgs.mkString( )}]
-|  jars$jars
-|  verbose $verbose
-|
-|Default properties from $propertiesFile:
-|${defaultSparkProperties.mkString(  , \n  , \n)}
-.stripMargin
+conf.mkString(\n)
   }
 
-  /** Fill in values by parsing user options. */
+  /**
--- End diff --

reverted


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...

2014-09-25 Thread tigerquoll
Github user tigerquoll commented on a diff in the pull request:

https://github.com/apache/spark/pull/2516#discussion_r18068723
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala ---
@@ -406,22 +412,173 @@ private[spark] class SparkSubmitArguments(args: 
Seq[String]) {
   }
 }
 
-object SparkSubmitArguments {
-  /** Load properties present in the given file. */
-  def getPropertiesFromFile(file: File): Seq[(String, String)] = {
-require(file.exists(), sProperties file $file does not exist)
-require(file.isFile(), sProperties file $file is not a normal file)
-val inputStream = new FileInputStream(file)
+private[spark] object SparkSubmitArguments {
+  /**
+   * Resolves Configuration sources in order of highest to lowest
+   * 1. Each map passed in as additionalConfig from first to last
+   * 2. Environment variables (including legacy variable mappings)
+   * 3. System config variables (eg by using -Dspark.var.name)
+   * 4  SPARK_DEFAULT_CONF/spark-defaults.conf or 
SPARK_HOME/conf/spark-defaults.conf
+   * 5. hard coded defaults in class path at spark-submit-defaults.prop
+   *
+   * A property file specified by one of the means listed above gets read 
in and the properties are
+   * considered to be at the priority of the method that specified the 
files.
+   * A property specified in a property file will not override an existing
+   * config value at that same level
+   *
+   * @param additionalConfigs Seq of additional 
Map[ConfigName-ConfigValue] in order of highest
+   *  priority to lowest this will have priority 
over internal sources
+   * @return Map[propName-propFile] containing values merged from all 
sources in order of priority
+   */
+  def mergeSparkProperties(additionalConfigs: Seq [Map[String,String]]) = {
+// Configuration read in from spark-submit-defaults.prop file found on 
the classpath
+var hardCodedDefaultConfig: Option[Map[String,String]] = None
+var is: InputStream = null
+var isr: Option[InputStreamReader] = None
 try {
-  val properties = new Properties()
-  properties.load(inputStream)
-  properties.stringPropertyNames().toSeq.map(k = (k, 
properties(k).trim))
-} catch {
-  case e: IOException =
-val message = sFailed when loading Spark properties file $file
-throw new SparkException(message, e)
+  is = 
Thread.currentThread().getContextClassLoader.getResourceAsStream(ClassPathSparkSubmitDefaults)
+
+  // only open InputStreamReader if InputStream was successfully opened
+  isr = Option(is).map{is: InputStream =
+new InputStreamReader(is, CharEncoding.UTF_8)
+  }
+
+  hardCodedDefaultConfig = isr.map( defaultValueStream =
+
SparkSubmitArguments.getPropertyValuesFromStream(defaultValueStream))
 } finally {
-  inputStream.close()
+  Option(is).foreach(_.close)
+  isr.foreach(_.close)
 }
+
+if (hardCodedDefaultConfig.isEmpty || (hardCodedDefaultConfig.get.size 
== 0)) {
+  throw new IllegalStateException(sDefault values not found at 
classpath $ClassPathSparkSubmitDefaults)
+}
+
+// Configuration read in from defaults file if it exists
+var sparkDefaultConfig = SparkSubmitArguments.getSparkDefaultFileConfig
+
+if (sparkDefaultConfig.isDefinedAt(SparkPropertiesFile))  {
+  SparkSubmitArguments.getPropertyValuesFromFile(
+  sparkDefaultConfig.get(SparkPropertiesFile).get)
+} else {
+  Map.empty
+}
+
+// Configuration from java system properties
+val systemPropertyConfig = 
SparkSubmitArguments.getPropertyMap(System.getProperties)
--- End diff --

Wish I'd seen that one before
Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...

2014-09-25 Thread tigerquoll
Github user tigerquoll commented on a diff in the pull request:

https://github.com/apache/spark/pull/2516#discussion_r18068816
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala ---
@@ -406,22 +412,173 @@ private[spark] class SparkSubmitArguments(args: 
Seq[String]) {
   }
 }
 
-object SparkSubmitArguments {
-  /** Load properties present in the given file. */
-  def getPropertiesFromFile(file: File): Seq[(String, String)] = {
-require(file.exists(), sProperties file $file does not exist)
-require(file.isFile(), sProperties file $file is not a normal file)
-val inputStream = new FileInputStream(file)
+private[spark] object SparkSubmitArguments {
+  /**
+   * Resolves Configuration sources in order of highest to lowest
+   * 1. Each map passed in as additionalConfig from first to last
+   * 2. Environment variables (including legacy variable mappings)
+   * 3. System config variables (eg by using -Dspark.var.name)
+   * 4  SPARK_DEFAULT_CONF/spark-defaults.conf or 
SPARK_HOME/conf/spark-defaults.conf
+   * 5. hard coded defaults in class path at spark-submit-defaults.prop
+   *
+   * A property file specified by one of the means listed above gets read 
in and the properties are
+   * considered to be at the priority of the method that specified the 
files.
+   * A property specified in a property file will not override an existing
+   * config value at that same level
+   *
+   * @param additionalConfigs Seq of additional 
Map[ConfigName-ConfigValue] in order of highest
+   *  priority to lowest this will have priority 
over internal sources
+   * @return Map[propName-propFile] containing values merged from all 
sources in order of priority
+   */
+  def mergeSparkProperties(additionalConfigs: Seq [Map[String,String]]) = {
+// Configuration read in from spark-submit-defaults.prop file found on 
the classpath
+var hardCodedDefaultConfig: Option[Map[String,String]] = None
+var is: InputStream = null
+var isr: Option[InputStreamReader] = None
 try {
-  val properties = new Properties()
-  properties.load(inputStream)
-  properties.stringPropertyNames().toSeq.map(k = (k, 
properties(k).trim))
-} catch {
-  case e: IOException =
-val message = sFailed when loading Spark properties file $file
-throw new SparkException(message, e)
+  is = 
Thread.currentThread().getContextClassLoader.getResourceAsStream(ClassPathSparkSubmitDefaults)
+
+  // only open InputStreamReader if InputStream was successfully opened
+  isr = Option(is).map{is: InputStream =
+new InputStreamReader(is, CharEncoding.UTF_8)
+  }
+
+  hardCodedDefaultConfig = isr.map( defaultValueStream =
+
SparkSubmitArguments.getPropertyValuesFromStream(defaultValueStream))
 } finally {
-  inputStream.close()
+  Option(is).foreach(_.close)
+  isr.foreach(_.close)
 }
+
+if (hardCodedDefaultConfig.isEmpty || (hardCodedDefaultConfig.get.size 
== 0)) {
+  throw new IllegalStateException(sDefault values not found at 
classpath $ClassPathSparkSubmitDefaults)
+}
+
+// Configuration read in from defaults file if it exists
+var sparkDefaultConfig = SparkSubmitArguments.getSparkDefaultFileConfig
+
+if (sparkDefaultConfig.isDefinedAt(SparkPropertiesFile))  {
+  SparkSubmitArguments.getPropertyValuesFromFile(
--- End diff --

Line removed completely.  All sources have property files loaded and 
properties merged later on in the function


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...

2014-09-25 Thread tigerquoll
Github user tigerquoll commented on a diff in the pull request:

https://github.com/apache/spark/pull/2516#discussion_r18068897
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala ---
@@ -406,22 +412,173 @@ private[spark] class SparkSubmitArguments(args: 
Seq[String]) {
   }
 }
 
-object SparkSubmitArguments {
-  /** Load properties present in the given file. */
-  def getPropertiesFromFile(file: File): Seq[(String, String)] = {
-require(file.exists(), sProperties file $file does not exist)
-require(file.isFile(), sProperties file $file is not a normal file)
-val inputStream = new FileInputStream(file)
+private[spark] object SparkSubmitArguments {
+  /**
+   * Resolves Configuration sources in order of highest to lowest
+   * 1. Each map passed in as additionalConfig from first to last
+   * 2. Environment variables (including legacy variable mappings)
+   * 3. System config variables (eg by using -Dspark.var.name)
+   * 4  SPARK_DEFAULT_CONF/spark-defaults.conf or 
SPARK_HOME/conf/spark-defaults.conf
+   * 5. hard coded defaults in class path at spark-submit-defaults.prop
+   *
+   * A property file specified by one of the means listed above gets read 
in and the properties are
+   * considered to be at the priority of the method that specified the 
files.
+   * A property specified in a property file will not override an existing
+   * config value at that same level
+   *
+   * @param additionalConfigs Seq of additional 
Map[ConfigName-ConfigValue] in order of highest
+   *  priority to lowest this will have priority 
over internal sources
+   * @return Map[propName-propFile] containing values merged from all 
sources in order of priority
+   */
+  def mergeSparkProperties(additionalConfigs: Seq [Map[String,String]]) = {
+// Configuration read in from spark-submit-defaults.prop file found on 
the classpath
+var hardCodedDefaultConfig: Option[Map[String,String]] = None
+var is: InputStream = null
+var isr: Option[InputStreamReader] = None
 try {
-  val properties = new Properties()
-  properties.load(inputStream)
-  properties.stringPropertyNames().toSeq.map(k = (k, 
properties(k).trim))
-} catch {
-  case e: IOException =
-val message = sFailed when loading Spark properties file $file
-throw new SparkException(message, e)
+  is = 
Thread.currentThread().getContextClassLoader.getResourceAsStream(ClassPathSparkSubmitDefaults)
+
+  // only open InputStreamReader if InputStream was successfully opened
+  isr = Option(is).map{is: InputStream =
+new InputStreamReader(is, CharEncoding.UTF_8)
+  }
+
+  hardCodedDefaultConfig = isr.map( defaultValueStream =
+
SparkSubmitArguments.getPropertyValuesFromStream(defaultValueStream))
 } finally {
-  inputStream.close()
+  Option(is).foreach(_.close)
+  isr.foreach(_.close)
 }
+
+if (hardCodedDefaultConfig.isEmpty || (hardCodedDefaultConfig.get.size 
== 0)) {
+  throw new IllegalStateException(sDefault values not found at 
classpath $ClassPathSparkSubmitDefaults)
+}
+
+// Configuration read in from defaults file if it exists
+var sparkDefaultConfig = SparkSubmitArguments.getSparkDefaultFileConfig
+
+if (sparkDefaultConfig.isDefinedAt(SparkPropertiesFile))  {
+  SparkSubmitArguments.getPropertyValuesFromFile(
+  sparkDefaultConfig.get(SparkPropertiesFile).get)
+} else {
+  Map.empty
+}
+
+// Configuration from java system properties
+val systemPropertyConfig = 
SparkSubmitArguments.getPropertyMap(System.getProperties)
+
+// Configuration variables from the environment
+// support legacy variables
+val environmentConfig = System.getenv().asScala
+
+val legacyEnvVars = Seq(MASTER-SparkMaster, 
DEPLOY_MODE-SparkDeployMode,
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...

2014-09-25 Thread tigerquoll
Github user tigerquoll commented on a diff in the pull request:

https://github.com/apache/spark/pull/2516#discussion_r18068968
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala ---
@@ -406,22 +412,173 @@ private[spark] class SparkSubmitArguments(args: 
Seq[String]) {
   }
 }
 
-object SparkSubmitArguments {
-  /** Load properties present in the given file. */
-  def getPropertiesFromFile(file: File): Seq[(String, String)] = {
-require(file.exists(), sProperties file $file does not exist)
-require(file.isFile(), sProperties file $file is not a normal file)
-val inputStream = new FileInputStream(file)
+private[spark] object SparkSubmitArguments {
+  /**
+   * Resolves Configuration sources in order of highest to lowest
+   * 1. Each map passed in as additionalConfig from first to last
+   * 2. Environment variables (including legacy variable mappings)
+   * 3. System config variables (eg by using -Dspark.var.name)
+   * 4  SPARK_DEFAULT_CONF/spark-defaults.conf or 
SPARK_HOME/conf/spark-defaults.conf
+   * 5. hard coded defaults in class path at spark-submit-defaults.prop
+   *
+   * A property file specified by one of the means listed above gets read 
in and the properties are
+   * considered to be at the priority of the method that specified the 
files.
+   * A property specified in a property file will not override an existing
+   * config value at that same level
+   *
+   * @param additionalConfigs Seq of additional 
Map[ConfigName-ConfigValue] in order of highest
+   *  priority to lowest this will have priority 
over internal sources
+   * @return Map[propName-propFile] containing values merged from all 
sources in order of priority
+   */
+  def mergeSparkProperties(additionalConfigs: Seq [Map[String,String]]) = {
+// Configuration read in from spark-submit-defaults.prop file found on 
the classpath
+var hardCodedDefaultConfig: Option[Map[String,String]] = None
+var is: InputStream = null
+var isr: Option[InputStreamReader] = None
 try {
-  val properties = new Properties()
-  properties.load(inputStream)
-  properties.stringPropertyNames().toSeq.map(k = (k, 
properties(k).trim))
-} catch {
-  case e: IOException =
-val message = sFailed when loading Spark properties file $file
-throw new SparkException(message, e)
+  is = 
Thread.currentThread().getContextClassLoader.getResourceAsStream(ClassPathSparkSubmitDefaults)
+
+  // only open InputStreamReader if InputStream was successfully opened
+  isr = Option(is).map{is: InputStream =
+new InputStreamReader(is, CharEncoding.UTF_8)
+  }
+
+  hardCodedDefaultConfig = isr.map( defaultValueStream =
+
SparkSubmitArguments.getPropertyValuesFromStream(defaultValueStream))
 } finally {
-  inputStream.close()
+  Option(is).foreach(_.close)
+  isr.foreach(_.close)
 }
+
+if (hardCodedDefaultConfig.isEmpty || (hardCodedDefaultConfig.get.size 
== 0)) {
+  throw new IllegalStateException(sDefault values not found at 
classpath $ClassPathSparkSubmitDefaults)
+}
+
+// Configuration read in from defaults file if it exists
+var sparkDefaultConfig = SparkSubmitArguments.getSparkDefaultFileConfig
+
+if (sparkDefaultConfig.isDefinedAt(SparkPropertiesFile))  {
+  SparkSubmitArguments.getPropertyValuesFromFile(
+  sparkDefaultConfig.get(SparkPropertiesFile).get)
+} else {
+  Map.empty
+}
+
+// Configuration from java system properties
+val systemPropertyConfig = 
SparkSubmitArguments.getPropertyMap(System.getProperties)
+
+// Configuration variables from the environment
+// support legacy variables
+val environmentConfig = System.getenv().asScala
+
+val legacyEnvVars = Seq(MASTER-SparkMaster, 
DEPLOY_MODE-SparkDeployMode,
+  SPARK_DRIVER_MEMORY-SparkDriverMemory, 
SPARK_EXECUTOR_MEMORY-SparkExecutorMemory)
+
+// legacy variables act at the priority of a system property
--- End diff --

To keep things cleaner, I was hoping to only pull information from the 
various sources, and produce a map at the end - what the caller choses to do 
with the map of values (push to system props, or env props or cmd line 
arguments etc) - would be up to the caller


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA

[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...

2014-09-25 Thread tigerquoll
Github user tigerquoll commented on a diff in the pull request:

https://github.com/apache/spark/pull/2516#discussion_r18069354
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala ---
@@ -406,22 +412,173 @@ private[spark] class SparkSubmitArguments(args: 
Seq[String]) {
   }
 }
 
-object SparkSubmitArguments {
-  /** Load properties present in the given file. */
-  def getPropertiesFromFile(file: File): Seq[(String, String)] = {
-require(file.exists(), sProperties file $file does not exist)
-require(file.isFile(), sProperties file $file is not a normal file)
-val inputStream = new FileInputStream(file)
+private[spark] object SparkSubmitArguments {
+  /**
+   * Resolves Configuration sources in order of highest to lowest
+   * 1. Each map passed in as additionalConfig from first to last
+   * 2. Environment variables (including legacy variable mappings)
+   * 3. System config variables (eg by using -Dspark.var.name)
+   * 4  SPARK_DEFAULT_CONF/spark-defaults.conf or 
SPARK_HOME/conf/spark-defaults.conf
+   * 5. hard coded defaults in class path at spark-submit-defaults.prop
+   *
+   * A property file specified by one of the means listed above gets read 
in and the properties are
+   * considered to be at the priority of the method that specified the 
files.
+   * A property specified in a property file will not override an existing
+   * config value at that same level
+   *
+   * @param additionalConfigs Seq of additional 
Map[ConfigName-ConfigValue] in order of highest
+   *  priority to lowest this will have priority 
over internal sources
+   * @return Map[propName-propFile] containing values merged from all 
sources in order of priority
+   */
+  def mergeSparkProperties(additionalConfigs: Seq [Map[String,String]]) = {
+// Configuration read in from spark-submit-defaults.prop file found on 
the classpath
+var hardCodedDefaultConfig: Option[Map[String,String]] = None
+var is: InputStream = null
+var isr: Option[InputStreamReader] = None
 try {
-  val properties = new Properties()
-  properties.load(inputStream)
-  properties.stringPropertyNames().toSeq.map(k = (k, 
properties(k).trim))
-} catch {
-  case e: IOException =
-val message = sFailed when loading Spark properties file $file
-throw new SparkException(message, e)
+  is = 
Thread.currentThread().getContextClassLoader.getResourceAsStream(ClassPathSparkSubmitDefaults)
+
+  // only open InputStreamReader if InputStream was successfully opened
+  isr = Option(is).map{is: InputStream =
+new InputStreamReader(is, CharEncoding.UTF_8)
+  }
+
+  hardCodedDefaultConfig = isr.map( defaultValueStream =
+
SparkSubmitArguments.getPropertyValuesFromStream(defaultValueStream))
 } finally {
-  inputStream.close()
+  Option(is).foreach(_.close)
+  isr.foreach(_.close)
 }
+
+if (hardCodedDefaultConfig.isEmpty || (hardCodedDefaultConfig.get.size 
== 0)) {
+  throw new IllegalStateException(sDefault values not found at 
classpath $ClassPathSparkSubmitDefaults)
+}
+
+// Configuration read in from defaults file if it exists
+var sparkDefaultConfig = SparkSubmitArguments.getSparkDefaultFileConfig
+
+if (sparkDefaultConfig.isDefinedAt(SparkPropertiesFile))  {
+  SparkSubmitArguments.getPropertyValuesFromFile(
+  sparkDefaultConfig.get(SparkPropertiesFile).get)
+} else {
+  Map.empty
+}
+
+// Configuration from java system properties
+val systemPropertyConfig = 
SparkSubmitArguments.getPropertyMap(System.getProperties)
+
+// Configuration variables from the environment
+// support legacy variables
+val environmentConfig = System.getenv().asScala
+
+val legacyEnvVars = Seq(MASTER-SparkMaster, 
DEPLOY_MODE-SparkDeployMode,
+  SPARK_DRIVER_MEMORY-SparkDriverMemory, 
SPARK_EXECUTOR_MEMORY-SparkExecutorMemory)
+
+// legacy variables act at the priority of a system property
+val propsWithEnvVars : mutable.Map[String,String] = new 
mutable.HashMap() ++ systemPropertyConfig ++ legacyEnvVars
+  .map( {case(varName, propName) = (environmentConfig.get(varName), 
propName) })
+  .filter( {case(varVariable, _) = varVariable.isDefined  
!varVariable.get.isEmpty} )
+  .map{case(varVariable, propName) = (propName, varVariable.get)}
+
+val ConfigSources  = additionalConfigs ++ Seq (
+  environmentConfig,
+  propsWithEnvVars,
+  sparkDefaultConfig,
+  hardCodedDefaultConfig.get
+)
+
+// Load properties file

[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...

2014-09-25 Thread tigerquoll
Github user tigerquoll commented on a diff in the pull request:

https://github.com/apache/spark/pull/2516#discussion_r18070264
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala ---
@@ -406,22 +412,173 @@ private[spark] class SparkSubmitArguments(args: 
Seq[String]) {
   }
 }
 
-object SparkSubmitArguments {
-  /** Load properties present in the given file. */
-  def getPropertiesFromFile(file: File): Seq[(String, String)] = {
-require(file.exists(), sProperties file $file does not exist)
-require(file.isFile(), sProperties file $file is not a normal file)
-val inputStream = new FileInputStream(file)
+private[spark] object SparkSubmitArguments {
+  /**
+   * Resolves Configuration sources in order of highest to lowest
+   * 1. Each map passed in as additionalConfig from first to last
+   * 2. Environment variables (including legacy variable mappings)
+   * 3. System config variables (eg by using -Dspark.var.name)
+   * 4  SPARK_DEFAULT_CONF/spark-defaults.conf or 
SPARK_HOME/conf/spark-defaults.conf
+   * 5. hard coded defaults in class path at spark-submit-defaults.prop
+   *
+   * A property file specified by one of the means listed above gets read 
in and the properties are
+   * considered to be at the priority of the method that specified the 
files.
+   * A property specified in a property file will not override an existing
+   * config value at that same level
+   *
+   * @param additionalConfigs Seq of additional 
Map[ConfigName-ConfigValue] in order of highest
+   *  priority to lowest this will have priority 
over internal sources
+   * @return Map[propName-propFile] containing values merged from all 
sources in order of priority
+   */
+  def mergeSparkProperties(additionalConfigs: Seq [Map[String,String]]) = {
+// Configuration read in from spark-submit-defaults.prop file found on 
the classpath
+var hardCodedDefaultConfig: Option[Map[String,String]] = None
+var is: InputStream = null
+var isr: Option[InputStreamReader] = None
 try {
-  val properties = new Properties()
-  properties.load(inputStream)
-  properties.stringPropertyNames().toSeq.map(k = (k, 
properties(k).trim))
-} catch {
-  case e: IOException =
-val message = sFailed when loading Spark properties file $file
-throw new SparkException(message, e)
+  is = 
Thread.currentThread().getContextClassLoader.getResourceAsStream(ClassPathSparkSubmitDefaults)
+
+  // only open InputStreamReader if InputStream was successfully opened
+  isr = Option(is).map{is: InputStream =
+new InputStreamReader(is, CharEncoding.UTF_8)
+  }
+
+  hardCodedDefaultConfig = isr.map( defaultValueStream =
+
SparkSubmitArguments.getPropertyValuesFromStream(defaultValueStream))
 } finally {
-  inputStream.close()
+  Option(is).foreach(_.close)
+  isr.foreach(_.close)
 }
+
+if (hardCodedDefaultConfig.isEmpty || (hardCodedDefaultConfig.get.size 
== 0)) {
+  throw new IllegalStateException(sDefault values not found at 
classpath $ClassPathSparkSubmitDefaults)
+}
+
+// Configuration read in from defaults file if it exists
+var sparkDefaultConfig = SparkSubmitArguments.getSparkDefaultFileConfig
+
+if (sparkDefaultConfig.isDefinedAt(SparkPropertiesFile))  {
+  SparkSubmitArguments.getPropertyValuesFromFile(
+  sparkDefaultConfig.get(SparkPropertiesFile).get)
+} else {
+  Map.empty
+}
+
+// Configuration from java system properties
+val systemPropertyConfig = 
SparkSubmitArguments.getPropertyMap(System.getProperties)
+
+// Configuration variables from the environment
+// support legacy variables
+val environmentConfig = System.getenv().asScala
+
+val legacyEnvVars = Seq(MASTER-SparkMaster, 
DEPLOY_MODE-SparkDeployMode,
+  SPARK_DRIVER_MEMORY-SparkDriverMemory, 
SPARK_EXECUTOR_MEMORY-SparkExecutorMemory)
+
+// legacy variables act at the priority of a system property
+val propsWithEnvVars : mutable.Map[String,String] = new 
mutable.HashMap() ++ systemPropertyConfig ++ legacyEnvVars
+  .map( {case(varName, propName) = (environmentConfig.get(varName), 
propName) })
+  .filter( {case(varVariable, _) = varVariable.isDefined  
!varVariable.get.isEmpty} )
+  .map{case(varVariable, propName) = (propName, varVariable.get)}
+
+val ConfigSources  = additionalConfigs ++ Seq (
+  environmentConfig,
+  propsWithEnvVars,
+  sparkDefaultConfig,
+  hardCodedDefaultConfig.get
+)
+
+// Load properties file

[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...

2014-09-25 Thread tigerquoll
Github user tigerquoll commented on a diff in the pull request:

https://github.com/apache/spark/pull/2516#discussion_r18070363
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala ---
@@ -406,22 +412,173 @@ private[spark] class SparkSubmitArguments(args: 
Seq[String]) {
   }
 }
 
-object SparkSubmitArguments {
-  /** Load properties present in the given file. */
-  def getPropertiesFromFile(file: File): Seq[(String, String)] = {
-require(file.exists(), sProperties file $file does not exist)
-require(file.isFile(), sProperties file $file is not a normal file)
-val inputStream = new FileInputStream(file)
+private[spark] object SparkSubmitArguments {
+  /**
+   * Resolves Configuration sources in order of highest to lowest
+   * 1. Each map passed in as additionalConfig from first to last
+   * 2. Environment variables (including legacy variable mappings)
+   * 3. System config variables (eg by using -Dspark.var.name)
+   * 4  SPARK_DEFAULT_CONF/spark-defaults.conf or 
SPARK_HOME/conf/spark-defaults.conf
+   * 5. hard coded defaults in class path at spark-submit-defaults.prop
+   *
+   * A property file specified by one of the means listed above gets read 
in and the properties are
+   * considered to be at the priority of the method that specified the 
files.
+   * A property specified in a property file will not override an existing
+   * config value at that same level
+   *
+   * @param additionalConfigs Seq of additional 
Map[ConfigName-ConfigValue] in order of highest
+   *  priority to lowest this will have priority 
over internal sources
+   * @return Map[propName-propFile] containing values merged from all 
sources in order of priority
+   */
+  def mergeSparkProperties(additionalConfigs: Seq [Map[String,String]]) = {
+// Configuration read in from spark-submit-defaults.prop file found on 
the classpath
+var hardCodedDefaultConfig: Option[Map[String,String]] = None
+var is: InputStream = null
+var isr: Option[InputStreamReader] = None
 try {
-  val properties = new Properties()
-  properties.load(inputStream)
-  properties.stringPropertyNames().toSeq.map(k = (k, 
properties(k).trim))
-} catch {
-  case e: IOException =
-val message = sFailed when loading Spark properties file $file
-throw new SparkException(message, e)
+  is = 
Thread.currentThread().getContextClassLoader.getResourceAsStream(ClassPathSparkSubmitDefaults)
+
+  // only open InputStreamReader if InputStream was successfully opened
+  isr = Option(is).map{is: InputStream =
+new InputStreamReader(is, CharEncoding.UTF_8)
+  }
+
+  hardCodedDefaultConfig = isr.map( defaultValueStream =
+
SparkSubmitArguments.getPropertyValuesFromStream(defaultValueStream))
 } finally {
-  inputStream.close()
+  Option(is).foreach(_.close)
+  isr.foreach(_.close)
 }
+
+if (hardCodedDefaultConfig.isEmpty || (hardCodedDefaultConfig.get.size 
== 0)) {
+  throw new IllegalStateException(sDefault values not found at 
classpath $ClassPathSparkSubmitDefaults)
+}
+
+// Configuration read in from defaults file if it exists
+var sparkDefaultConfig = SparkSubmitArguments.getSparkDefaultFileConfig
+
+if (sparkDefaultConfig.isDefinedAt(SparkPropertiesFile))  {
+  SparkSubmitArguments.getPropertyValuesFromFile(
+  sparkDefaultConfig.get(SparkPropertiesFile).get)
+} else {
+  Map.empty
+}
+
+// Configuration from java system properties
+val systemPropertyConfig = 
SparkSubmitArguments.getPropertyMap(System.getProperties)
+
+// Configuration variables from the environment
+// support legacy variables
+val environmentConfig = System.getenv().asScala
+
+val legacyEnvVars = Seq(MASTER-SparkMaster, 
DEPLOY_MODE-SparkDeployMode,
+  SPARK_DRIVER_MEMORY-SparkDriverMemory, 
SPARK_EXECUTOR_MEMORY-SparkExecutorMemory)
+
+// legacy variables act at the priority of a system property
+val propsWithEnvVars : mutable.Map[String,String] = new 
mutable.HashMap() ++ systemPropertyConfig ++ legacyEnvVars
+  .map( {case(varName, propName) = (environmentConfig.get(varName), 
propName) })
+  .filter( {case(varVariable, _) = varVariable.isDefined  
!varVariable.get.isEmpty} )
+  .map{case(varVariable, propName) = (propName, varVariable.get)}
+
+val ConfigSources  = additionalConfigs ++ Seq (
+  environmentConfig,
+  propsWithEnvVars,
+  sparkDefaultConfig,
+  hardCodedDefaultConfig.get
+)
+
+// Load properties file

[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...

2014-09-24 Thread tigerquoll
GitHub user tigerquoll opened a pull request:

https://github.com/apache/spark/pull/2516

Spark Core - [SPARK-3620] - Refactor of SparkSubmit Argument parsing code

Argument processing seems to have gotten a lot of attention lately, so I 
thought I might throw my contribution into the ring.  Attached for 
consideration and to prompt discussion is a revamp of argument handling in 
SparkSubmit aimed at making things a lot more consistent. The only things that 
have been modified are the way that configuration properties are read/ 
processed and prioritised 

Things to note include:
* All configuration parameters can now be consistently set via config file

* Configuration parameters defaults have been removed from the code, and 
placed into a property file which is read from the class path on startup.  
There should be no need to trace through 5 files to see what a config parameter 
defaults to if it is not specified, or have different default values applied in 
multiple places throughout the code.

* Configuration parameter validation is now done once all configuration 
parameters have been read in and resolved from various locations, not just when 
reading the command line.

* All property files (including spark_default_conf) are parsed by Java 
property handling code. All custom parsing code has been removed. Escaping of 
characters should now be consistent everywhere.

* All configuration parameters are overridden in the same consistent way - 
configuration parameters for sparkSubmit are pulled form the following sources 
in order of priority
 1. Entries specified on the command line (except from --conf entries)
 2. Entries specified on the command line with --conf
 3. Environment variables (including legacy variable mappings)
 4. System config variables (eg by using -Dspark.var.name)
 5. $(SPARK_DEFAULT_CONF)/spark-defaults.conf or 
$(SPARK_HOME)/conf/spark-defaults.conf if either exist
 6. Hard coded defaults in class path at spark-submit-defaults.prop

* A property file specified by one of the sources listed above gets read in 
and the properties are considered to be at the priority of the configuration 
source that specified the file. A property specified in a property file will 
not override an existing config value already specifiedby that configuration 
source

The existing argument handling is pretty finicky - chances are high that 
I’ve missed some behaviour - if this PR is going to be accepted/approved let 
me know any bugs and I’ll fix them up and document the behaviour for future 
reference

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tigerquoll/spark-3620 master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/2516.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2516


commit b1a9682dd2bbff824c4e8481fa0ce5118c47de68
Author: Dale tigerqu...@outlook.com
Date:   2014-09-21T02:42:24Z

Initial pass at using typesafe's conf object for handling configuration 
options

commit 7bb5ee95b3f06147dba994e3d557221554415bfd
Author: Dale tigerqu...@outlook.com
Date:   2014-09-21T02:44:09Z

Added defaults file

commit e995a6d1e8ab898c85aa5fe259b81c630595075f
Author: Dale tigerqu...@outlook.com
Date:   2014-09-21T12:56:17Z

Existing tests now work

commit 00ee008c5652336d533d9619bc7e6306ed59138b
Author: Dale tigerqu...@outlook.com
Date:   2014-09-21T13:05:14Z

Existing tests now work

commit 295c62b067fb5204efb58892133c77fe49b877e0
Author: Dale tigerqu...@outlook.com
Date:   2014-09-22T22:04:45Z

Created mergedPropertyMap

commit f399170e1c05d75257ff6c508a96e64cadf0d87b
Author: Dale tigerqu...@outlook.com
Date:   2014-09-23T00:10:40Z

Moved sparkSubmitArguments module to use custom property map merging code

commit b0abe3196f9e5d3f577e158704740f1eee8fbb59
Author: Dale tigerqu...@outlook.com
Date:   2014-09-23T23:58:55Z

Merge branch 'master' of https://github.com/apache/spark

commit 562ec7c064e5ad632cf7aaa1720be29fe36b5c9a
Author: Dale tigerqu...@outlook.com
Date:   2014-09-23T23:59:52Z

note for additional tests

commit 86f71f8bb8291fe20a2f0ca0100727d583e97dfd
Author: Dale tigerqu...@outlook.com
Date:   2014-09-24T00:39:47Z

Changes needed to pass scalastyle check

commit 2019554ec307c8d3eabee7e4299cd8bac8faba0f
Author: Dale tigerqu...@outlook.com
Date:   2014-09-24T04:43:58Z

Changes needed to pass scalastyle check, merged from current 
SparkSubmit.scala

commit 8c416a04d064c1475a184785a9135d849c239bff
Author: Dale tigerqu...@outlook.com
Date:   2014-09-24T05:19:24Z

Fixed some typos

commit b69f58e65d919a689942866f59b11a7dcf2fbf91
Author: Dale tigerqu...@outlook.com
Date:   2014-09-24T07:08:01Z

Added spark.app.name to defaults list

[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...

2014-09-24 Thread tigerquoll
Github user tigerquoll commented on a diff in the pull request:

https://github.com/apache/spark/pull/2516#discussion_r18008077
  
--- Diff: 
core/src/main/resources/org/apache/spark/deploy/spark-submit-defaults.prop ---
@@ -0,0 +1,90 @@
+# The master URL for the cluster egspark://host:port, mesos://host:port, 
yarn, or local.
+# legacy env variable MASTER
+spark.master = local[*]
+
+# Should spark submit run in verbose mode: default is false
+spark.verbose = false
+
+# Comma-separated list of files to be placed in the working directory of 
each executor
+# spark.files =
+
+# Comma-separated list of .zip, .egg, or .py files to place on the 
PYTHONPATH for Python apps.
+# spark.submit.pyfiles =
+
+# Comma-separated list of local jars to include on the driver and executor 
classpaths.
+# spark.jars =
+
+
+# Path to a bundled jar including your application and all dependencies.
+# The URL must be globally visible inside of your cluster, for instance,
+# an hdfs:// path or a file:// path that is present on all nodes.
+# spark.app.primaryResource =
+
+# A name of your application.
+spark.app.name = Unknown Application
--- End diff --

ahh.. Looking at the code it was hard to determine if primaryResource was a 
required configuration item at all, which is why was I defined it as an 
Option[String]. I'll remove the default value, change it back to a String and 
add an explicit test to make sure it is defined after all new config items have 
been derived


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...

2014-09-24 Thread tigerquoll
Github user tigerquoll commented on a diff in the pull request:

https://github.com/apache/spark/pull/2516#discussion_r18008488
  
--- Diff: 
core/src/main/resources/org/apache/spark/deploy/spark-submit-defaults.prop ---
@@ -0,0 +1,90 @@
+# The master URL for the cluster egspark://host:port, mesos://host:port, 
yarn, or local.
+# legacy env variable MASTER
+spark.master = local[*]
+
+# Should spark submit run in verbose mode: default is false
+spark.verbose = false
+
+# Comma-separated list of files to be placed in the working directory of 
each executor
+# spark.files =
+
+# Comma-separated list of .zip, .egg, or .py files to place on the 
PYTHONPATH for Python apps.
+# spark.submit.pyfiles =
+
+# Comma-separated list of local jars to include on the driver and executor 
classpaths.
+# spark.jars =
+
+
+# Path to a bundled jar including your application and all dependencies.
+# The URL must be globally visible inside of your cluster, for instance,
+# an hdfs:// path or a file:// path that is present on all nodes.
+# spark.app.primaryResource =
+
+# A name of your application.
+spark.app.name = Unknown Application
+
+# Your application's main class (for Java / Scala apps).
+# spark.app.class =
--- End diff --

primaryResource, spark.app.name and spark.app,class have been removed from 
the file. I note in sparkSubmit:
if (mainClass == null  !isPython  primaryResource != null) {
.. attempt to derive class from primaryResource 
}
Does that mean it is valid to not have a main class defined at this stage 
if we are running python? or should we error out if we still do not have a main 
class after this code, regardless if we are running python or not?




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...

2014-09-24 Thread tigerquoll
Github user tigerquoll commented on a diff in the pull request:

https://github.com/apache/spark/pull/2516#discussion_r18008765
  
--- Diff: core/src/main/scala/org/apache/spark/SparkConf.scala ---
@@ -48,8 +50,10 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable 
with Logging {
   private[spark] val settings = new HashMap[String, String]()
 
   if (loadDefaults) {
+val sparkConfigs = 
SparkSubmitArguments.mergeSparkProperties(Vector.empty)
 // Load any spark.* system properties
-for ((k, v) - System.getProperties.asScala if k.startsWith(spark.)) 
{
+for ((k, v) - sparkConfigs
--- End diff --

I've reverted changes in SparkConf to its original (pre PR) state until 
approval in principal is given for the concepts behind the PR in an attempt to 
make the PR more understandable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...

2014-09-24 Thread tigerquoll
Github user tigerquoll commented on a diff in the pull request:

https://github.com/apache/spark/pull/2516#discussion_r18009041
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/MergedPropertyMap.scala ---
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy
+
+import scala.collection._
+
+
+object MergedPropertyMap {
+
+  /**
+   * Flatten a map of maps out into a single map, later maps in the 
propList
+   * have priority over older ones
+   * @param propList Vector of property maps[PropName-PropValue] to merge
+   */
+  def mergePropertyMaps( propList: Vector[Map[String, String]]): 
mutable.Map[String, String] = {
--- End diff --

fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...

2014-09-24 Thread tigerquoll
Github user tigerquoll commented on a diff in the pull request:

https://github.com/apache/spark/pull/2516#discussion_r18009050
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/MergedPropertyMap.scala ---
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy
+
+import scala.collection._
+
+
+object MergedPropertyMap {
+
+  /**
+   * Flatten a map of maps out into a single map, later maps in the 
propList
+   * have priority over older ones
+   * @param propList Vector of property maps[PropName-PropValue] to merge
+   */
+  def mergePropertyMaps( propList: Vector[Map[String, String]]): 
mutable.Map[String, String] = {
+val propMap = new mutable.HashMap[String, String]()
+// loop through each entry of each map in order of priority
+// and add it to our propMap
+propList.foreach {
+  _.foreach{ case(k,v) = if (propMap.getOrElse(k,).size == 0) 
propMap.put(k,v)}
--- End diff --

Thanks, missed that cleaner implementation. Changed to immutable Map and 
moved to utils


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...

2014-09-24 Thread tigerquoll
Github user tigerquoll commented on a diff in the pull request:

https://github.com/apache/spark/pull/2516#discussion_r18009125
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala ---
@@ -83,7 +88,7 @@ object SparkSubmit {
*   (4) the main class for the child
*/
   private[spark] def createLaunchEnv(args: SparkSubmitArguments)
-  : (ArrayBuffer[String], ArrayBuffer[String], Map[String, String], 
String) = {
+  : (ArrayBuffer[String], ArrayBuffer[String], Map[String, String], 
String) = {
--- End diff --

fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...

2014-09-24 Thread tigerquoll
Github user tigerquoll commented on a diff in the pull request:

https://github.com/apache/spark/pull/2516#discussion_r18009140
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala ---
@@ -130,7 +135,7 @@ object SparkSubmit {
   if (!Utils.classIsLoadable(org.apache.spark.deploy.yarn.Client)  
!Utils.isTesting) {
 printErrorAndExit(
   Could not load YARN classes.  +
-  This copy of Spark may not have been compiled with YARN 
support.)
+This copy of Spark may not have been compiled with YARN 
support.)
--- End diff --

fixed up, like last one, not sure how they got mangled


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...

2014-09-24 Thread tigerquoll
Github user tigerquoll commented on a diff in the pull request:

https://github.com/apache/spark/pull/2516#discussion_r18009318
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala ---
@@ -288,11 +291,11 @@ object SparkSubmit {
   }
 
   private def launch(
-  childArgs: ArrayBuffer[String],
-  childClasspath: ArrayBuffer[String],
-  sysProps: Map[String, String],
-  childMainClass: String,
-  verbose: Boolean = false) {
+  childArgs: ArrayBuffer[String],
--- End diff --

agreed very weird, reverted to original value


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   >