[jira] [Commented] (SPARK-2089) With YARN, preferredNodeLocalityData isn't honored

2014-08-15 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14099115#comment-14099115
 ] 

Mridul Muralidharan commented on SPARK-2089:



For a general case, wont InputFormat's not have customizations to them for 
creation and/or initialization before they can be used to get splits ? (other 
than file names I mean).


 With YARN, preferredNodeLocalityData isn't honored 
 ---

 Key: SPARK-2089
 URL: https://issues.apache.org/jira/browse/SPARK-2089
 Project: Spark
  Issue Type: Bug
  Components: YARN
Affects Versions: 1.0.0
Reporter: Sandy Ryza
Assignee: Sandy Ryza
Priority: Critical

 When running in YARN cluster mode, apps can pass preferred locality data when 
 constructing a Spark context that will dictate where to request executor 
 containers.
 This is currently broken because of a race condition.  The Spark-YARN code 
 runs the user class and waits for it to start up a SparkContext.  During its 
 initialization, the SparkContext will create a YarnClusterScheduler, which 
 notifies a monitor in the Spark-YARN code that .  The Spark-Yarn code then 
 immediately fetches the preferredNodeLocationData from the SparkContext and 
 uses it to start requesting containers.
 But in the SparkContext constructor that takes the preferredNodeLocationData, 
 setting preferredNodeLocationData comes after the rest of the initialization, 
 so, if the Spark-YARN code comes around quickly enough after being notified, 
 the data that's fetched is the empty unset version.  The occurred during all 
 of my runs.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-2089) With YARN, preferredNodeLocalityData isn't honored

2014-08-13 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14095247#comment-14095247
 ] 

Mridul Muralidharan commented on SPARK-2089:


Since I am not maintaining the code anymore, I dont have strong preference 
either way.
I am not sure what the format means btw - I see multiple nodes and racks 
mentioned in the same group ...

In general though, I am not convinced it is a good direction to take.
1) It is a workaround for a design issue and has non trivial performance 
implications (serializing into this form to immediately deserialize it is 
expensive for large inputs : not to mention, it gets shipped to executors for 
no reason).
2) It locks us into a format which provides inadequate information - number of 
blocks per node, size per block, etc is lost (or maybe I just did not 
understand what the format is !).
3) We are currently investigating evolving in the opposite direction - add more 
information so that we can be more specific about where to allocate executors.
For example: I can see the fairly near term need to associate executors with 
accelerator cards (and break the OFF_HEAP - tachyon implicit assumption).
A string representation makes it fragile to evolve.

As I mentioned before, the current yarn allocation model in spark is a very 
naive implementation - which I did not expect to survive this long : it was 
directly from our prototype.
We really should be modifying it to consider cost of data transfer and 
prioritize allocation that way (number of blocks on a node/rack, size of 
blocks, number of replicas available, etc).
For small datasets on small enough clusters this is not relevant but has 
implications as we grow along both axis.

 With YARN, preferredNodeLocalityData isn't honored 
 ---

 Key: SPARK-2089
 URL: https://issues.apache.org/jira/browse/SPARK-2089
 Project: Spark
  Issue Type: Bug
  Components: YARN
Affects Versions: 1.0.0
Reporter: Sandy Ryza
Assignee: Sandy Ryza
Priority: Critical

 When running in YARN cluster mode, apps can pass preferred locality data when 
 constructing a Spark context that will dictate where to request executor 
 containers.
 This is currently broken because of a race condition.  The Spark-YARN code 
 runs the user class and waits for it to start up a SparkContext.  During its 
 initialization, the SparkContext will create a YarnClusterScheduler, which 
 notifies a monitor in the Spark-YARN code that .  The Spark-Yarn code then 
 immediately fetches the preferredNodeLocationData from the SparkContext and 
 uses it to start requesting containers.
 But in the SparkContext constructor that takes the preferredNodeLocationData, 
 setting preferredNodeLocationData comes after the rest of the initialization, 
 so, if the Spark-YARN code comes around quickly enough after being notified, 
 the data that's fetched is the empty unset version.  The occurred during all 
 of my runs.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-2962) Suboptimal scheduling in spark

2014-08-11 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14092807#comment-14092807
 ] 

Mridul Muralidharan commented on SPARK-2962:


On further investigation :

a) The primary issue is a combination of SPARK-2089 and current schedule 
behavior for pendingTasksWithNoPrefs.
SPARK-2089 leads to very bad allocation of nodes - particularly has an impact 
on bigger clusters.
It leads to a lot of block having no data or rack local executors - causing 
them to end up in pendingTasksWithNoPrefs.

While loading data off dfs, when an executor is being scheduled, even though 
there might be rack local schedules available for it (or, on waiting a while, 
data local too - see (b) below), because of current scheduler behavior, tasks 
from pendingTasksWithNoPrefs get scheduled : causing a large number of ANY 
tasks to be scheduled at the very onset.

The combination of these, with lack of marginal alleviation via (b) is what 
caused the performance impact.

b) spark.scheduler.minRegisteredExecutorsRatio was not yet been used in the 
workload - so that might alleviate some of the non deterministic waiting and 
ensuring adequate executors are allocated ! Thanks [~lirui]



 Suboptimal scheduling in spark
 --

 Key: SPARK-2962
 URL: https://issues.apache.org/jira/browse/SPARK-2962
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.1.0
 Environment: All
Reporter: Mridul Muralidharan

 In findTask, irrespective of 'locality' specified, pendingTasksWithNoPrefs 
 are always scheduled with PROCESS_LOCAL
 pendingTasksWithNoPrefs contains tasks which currently do not have any alive 
 locations - but which could come in 'later' : particularly relevant when 
 spark app is just coming up and containers are still being added.
 This causes a large number of non node local tasks to be scheduled incurring 
 significant network transfers in the cluster when running with non trivial 
 datasets.
 The comment // Look for no-pref tasks after rack-local tasks since they can 
 run anywhere. is misleading in the method code : locality levels start from 
 process_local down to any, and so no prefs get scheduled much before rack.
 Also note that, currentLocalityIndex is reset to the taskLocality returned by 
 this method - so returning PROCESS_LOCAL as the level will trigger wait times 
 again. (Was relevant before recent change to scheduler, and might be again 
 based on resolution of this issue).
 Found as part of writing test for SPARK-2931
  



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-2962) Suboptimal scheduling in spark

2014-08-10 Thread Mridul Muralidharan (JIRA)
Mridul Muralidharan created SPARK-2962:
--

 Summary: Suboptimal scheduling in spark
 Key: SPARK-2962
 URL: https://issues.apache.org/jira/browse/SPARK-2962
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.1.0
 Environment: All
Reporter: Mridul Muralidharan



In findTask, irrespective of 'locality' specified, pendingTasksWithNoPrefs are 
always scheduled with PROCESS_LOCAL

pendingTasksWithNoPrefs contains tasks which currently do not have any alive 
locations - but which could come in 'later' : particularly relevant when spark 
app is just coming up and containers are still being added.

This causes a large number of non node local tasks to be scheduled incurring 
significant network transfers in the cluster when running with non trivial 
datasets.

The comment // Look for no-pref tasks after rack-local tasks since they can 
run anywhere. is misleading in the method code : locality levels start from 
process_local down to any, and so no prefs get scheduled much before rack.


Also note that, currentLocalityIndex is reset to the taskLocality returned by 
this method - so returning PROCESS_LOCAL as the level will trigger wait times 
again. (Was relevant before recent change to scheduler, and might be again 
based on resolution of this issue).


Found as part of writing test for SPARK-2931
 



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-2962) Suboptimal scheduling in spark

2014-08-10 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14092427#comment-14092427
 ] 

Mridul Muralidharan commented on SPARK-2962:


To give more context; 

a) Our jobs start with load data from dfs as starting point : and so this is 
the first stage that gets executed.

b) We are sleeping for 1 minute before starting the jobs (in case cluster is 
busy, etc) - unfortunately, this is not sufficient and iirc there is no 
programmatic way to wait more deterministically for X% of node (was something 
added to alleviate this ? I did see some discussion)

c) This becomes more of a problem because spark does not honour preferred 
location anymore while running in yarn. See SPARK-208 - due to 1.0 interface 
changes.
[ Practically, if we are using large enough number of nodes (with replication 
of 3 or higher), usually we do end up with quite of lot of data local tasks 
eventually - so (c) is not an immediate concern for our current jobs assuming 
(b) is not an issue, though it is suboptimal in general case ]



 Suboptimal scheduling in spark
 --

 Key: SPARK-2962
 URL: https://issues.apache.org/jira/browse/SPARK-2962
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.1.0
 Environment: All
Reporter: Mridul Muralidharan

 In findTask, irrespective of 'locality' specified, pendingTasksWithNoPrefs 
 are always scheduled with PROCESS_LOCAL
 pendingTasksWithNoPrefs contains tasks which currently do not have any alive 
 locations - but which could come in 'later' : particularly relevant when 
 spark app is just coming up and containers are still being added.
 This causes a large number of non node local tasks to be scheduled incurring 
 significant network transfers in the cluster when running with non trivial 
 datasets.
 The comment // Look for no-pref tasks after rack-local tasks since they can 
 run anywhere. is misleading in the method code : locality levels start from 
 process_local down to any, and so no prefs get scheduled much before rack.
 Also note that, currentLocalityIndex is reset to the taskLocality returned by 
 this method - so returning PROCESS_LOCAL as the level will trigger wait times 
 again. (Was relevant before recent change to scheduler, and might be again 
 based on resolution of this issue).
 Found as part of writing test for SPARK-2931
  



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-2962) Suboptimal scheduling in spark

2014-08-10 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14092430#comment-14092430
 ] 

Mridul Muralidharan commented on SPARK-2962:


Hi [~matei],

  I am referencing the latest code (as of yday night).

pendingTasksWithNoPrefs currnetly contains both tasks which truely have no 
preference, and tasks which have preference which are unavailble - and the 
latter is what is triggering this, since that can change during the execution 
of the stage.
Hope I am not missing something ?

Thanks,
Mridul

 Suboptimal scheduling in spark
 --

 Key: SPARK-2962
 URL: https://issues.apache.org/jira/browse/SPARK-2962
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.1.0
 Environment: All
Reporter: Mridul Muralidharan

 In findTask, irrespective of 'locality' specified, pendingTasksWithNoPrefs 
 are always scheduled with PROCESS_LOCAL
 pendingTasksWithNoPrefs contains tasks which currently do not have any alive 
 locations - but which could come in 'later' : particularly relevant when 
 spark app is just coming up and containers are still being added.
 This causes a large number of non node local tasks to be scheduled incurring 
 significant network transfers in the cluster when running with non trivial 
 datasets.
 The comment // Look for no-pref tasks after rack-local tasks since they can 
 run anywhere. is misleading in the method code : locality levels start from 
 process_local down to any, and so no prefs get scheduled much before rack.
 Also note that, currentLocalityIndex is reset to the taskLocality returned by 
 this method - so returning PROCESS_LOCAL as the level will trigger wait times 
 again. (Was relevant before recent change to scheduler, and might be again 
 based on resolution of this issue).
 Found as part of writing test for SPARK-2931
  



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-2962) Suboptimal scheduling in spark

2014-08-10 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14092431#comment-14092431
 ] 

Mridul Muralidharan commented on SPARK-2962:


Note, I dont think this is a regression in 1.1, and probably existed much 
earlier too.
Other issues are making us notice this (like SPARK-2089) - we moved to 1.1 from 
0.9 recently.

 Suboptimal scheduling in spark
 --

 Key: SPARK-2962
 URL: https://issues.apache.org/jira/browse/SPARK-2962
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.1.0
 Environment: All
Reporter: Mridul Muralidharan

 In findTask, irrespective of 'locality' specified, pendingTasksWithNoPrefs 
 are always scheduled with PROCESS_LOCAL
 pendingTasksWithNoPrefs contains tasks which currently do not have any alive 
 locations - but which could come in 'later' : particularly relevant when 
 spark app is just coming up and containers are still being added.
 This causes a large number of non node local tasks to be scheduled incurring 
 significant network transfers in the cluster when running with non trivial 
 datasets.
 The comment // Look for no-pref tasks after rack-local tasks since they can 
 run anywhere. is misleading in the method code : locality levels start from 
 process_local down to any, and so no prefs get scheduled much before rack.
 Also note that, currentLocalityIndex is reset to the taskLocality returned by 
 this method - so returning PROCESS_LOCAL as the level will trigger wait times 
 again. (Was relevant before recent change to scheduler, and might be again 
 based on resolution of this issue).
 Found as part of writing test for SPARK-2931
  



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-2962) Suboptimal scheduling in spark

2014-08-10 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14092427#comment-14092427
 ] 

Mridul Muralidharan edited comment on SPARK-2962 at 8/11/14 4:35 AM:
-

To give more context; 

a) Our jobs start with load data from dfs as starting point : and so this is 
the first stage that gets executed.

b) We are sleeping for 1 minute before starting the jobs (in case cluster is 
busy, etc) - unfortunately, this is not sufficient and iirc there is no 
programmatic way to wait more deterministically for X% of node (was something 
added to alleviate this ? I did see some discussion)

c) This becomes more of a problem because spark does not honour preferred 
location anymore while running in yarn. See SPARK-2089 - due to 1.0 interface 
changes.
[ Practically, if we are using large enough number of nodes (with replication 
of 3 or higher), usually we do end up with quite of lot of data local tasks 
eventually - so (c) is not an immediate concern for our current jobs assuming 
(b) is not an issue, though it is suboptimal in general case ]




was (Author: mridulm80):
To give more context; 

a) Our jobs start with load data from dfs as starting point : and so this is 
the first stage that gets executed.

b) We are sleeping for 1 minute before starting the jobs (in case cluster is 
busy, etc) - unfortunately, this is not sufficient and iirc there is no 
programmatic way to wait more deterministically for X% of node (was something 
added to alleviate this ? I did see some discussion)

c) This becomes more of a problem because spark does not honour preferred 
location anymore while running in yarn. See SPARK-208 - due to 1.0 interface 
changes.
[ Practically, if we are using large enough number of nodes (with replication 
of 3 or higher), usually we do end up with quite of lot of data local tasks 
eventually - so (c) is not an immediate concern for our current jobs assuming 
(b) is not an issue, though it is suboptimal in general case ]



 Suboptimal scheduling in spark
 --

 Key: SPARK-2962
 URL: https://issues.apache.org/jira/browse/SPARK-2962
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.1.0
 Environment: All
Reporter: Mridul Muralidharan

 In findTask, irrespective of 'locality' specified, pendingTasksWithNoPrefs 
 are always scheduled with PROCESS_LOCAL
 pendingTasksWithNoPrefs contains tasks which currently do not have any alive 
 locations - but which could come in 'later' : particularly relevant when 
 spark app is just coming up and containers are still being added.
 This causes a large number of non node local tasks to be scheduled incurring 
 significant network transfers in the cluster when running with non trivial 
 datasets.
 The comment // Look for no-pref tasks after rack-local tasks since they can 
 run anywhere. is misleading in the method code : locality levels start from 
 process_local down to any, and so no prefs get scheduled much before rack.
 Also note that, currentLocalityIndex is reset to the taskLocality returned by 
 this method - so returning PROCESS_LOCAL as the level will trigger wait times 
 again. (Was relevant before recent change to scheduler, and might be again 
 based on resolution of this issue).
 Found as part of writing test for SPARK-2931
  



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-2931) getAllowedLocalityLevel() throws ArrayIndexOutOfBoundsException

2014-08-09 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2931?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14091746#comment-14091746
 ] 

Mridul Muralidharan commented on SPARK-2931:


[~kayousterhout] this is weird, I remember mentioned this exact same issue in 
some PR for 1.1 (trying to find which one, though not 1313 iirc); and I think 
it was supposed to have been addressed.
We had observed this issue of currentLocalityLevel running away when we had 
internally merged the pr.

Strange that it was not addressed, speaks volumes of me not following up on my 
reviews !

 getAllowedLocalityLevel() throws ArrayIndexOutOfBoundsException
 ---

 Key: SPARK-2931
 URL: https://issues.apache.org/jira/browse/SPARK-2931
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.1.0
 Environment: Spark EC2, spark-1.1.0-snapshot1, sort-by-key spark-perf 
 benchmark
Reporter: Josh Rosen
Priority: Blocker
 Fix For: 1.1.0


 When running Spark Perf's sort-by-key benchmark on EC2 with v1.1.0-snapshot, 
 I get the following errors (one per task):
 {code}
 14/08/08 18:54:22 INFO scheduler.TaskSetManager: Starting task 39.0 in stage 
 0.0 (TID 39, ip-172-31-14-30.us-west-2.compute.internal, PROCESS_LOCAL, 1003 
 bytes)
 14/08/08 18:54:22 INFO cluster.SparkDeploySchedulerBackend: Registered 
 executor: 
 Actor[akka.tcp://sparkexecu...@ip-172-31-9-213.us-west-2.compute.internal:58901/user/Executor#1436065036]
  with ID 0
 14/08/08 18:54:22 ERROR actor.OneForOneStrategy: 1
 java.lang.ArrayIndexOutOfBoundsException: 1
   at 
 org.apache.spark.scheduler.TaskSetManager.getAllowedLocalityLevel(TaskSetManager.scala:475)
   at 
 org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:409)
   at 
 org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$7$$anonfun$apply$2.apply$mcVI$sp(TaskSchedulerImpl.scala:261)
   at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
   at 
 org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$7.apply(TaskSchedulerImpl.scala:257)
   at 
 org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$7.apply(TaskSchedulerImpl.scala:254)
   at 
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
   at 
 org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:254)
   at 
 org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:254)
   at 
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
   at 
 org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:254)
   at 
 org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverActor.makeOffers(CoarseGrainedSchedulerBackend.scala:153)
   at 
 org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverActor$$anonfun$receive$1.applyOrElse(CoarseGrainedSchedulerBackend.scala:103)
   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
   at akka.actor.ActorCell.invoke(ActorCell.scala:456)
   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
   at akka.dispatch.Mailbox.run(Mailbox.scala:219)
   at 
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
   at 
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
   at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
   at 
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 {code}
 This causes the job to hang.
 I can deterministically reproduce this by re-running the test, either in 
 isolation or as part of the full performance testing suite.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-2931) getAllowedLocalityLevel() throws ArrayIndexOutOfBoundsException

2014-08-09 Thread Mridul Muralidharan (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-2931?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan updated SPARK-2931:
---

Attachment: test.patch

A patch to showcase the exception

 getAllowedLocalityLevel() throws ArrayIndexOutOfBoundsException
 ---

 Key: SPARK-2931
 URL: https://issues.apache.org/jira/browse/SPARK-2931
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.1.0
 Environment: Spark EC2, spark-1.1.0-snapshot1, sort-by-key spark-perf 
 benchmark
Reporter: Josh Rosen
Priority: Blocker
 Fix For: 1.1.0

 Attachments: scala-sort-by-key.err, test.patch


 When running Spark Perf's sort-by-key benchmark on EC2 with v1.1.0-snapshot, 
 I get the following errors (one per task):
 {code}
 14/08/08 18:54:22 INFO scheduler.TaskSetManager: Starting task 39.0 in stage 
 0.0 (TID 39, ip-172-31-14-30.us-west-2.compute.internal, PROCESS_LOCAL, 1003 
 bytes)
 14/08/08 18:54:22 INFO cluster.SparkDeploySchedulerBackend: Registered 
 executor: 
 Actor[akka.tcp://sparkexecu...@ip-172-31-9-213.us-west-2.compute.internal:58901/user/Executor#1436065036]
  with ID 0
 14/08/08 18:54:22 ERROR actor.OneForOneStrategy: 1
 java.lang.ArrayIndexOutOfBoundsException: 1
   at 
 org.apache.spark.scheduler.TaskSetManager.getAllowedLocalityLevel(TaskSetManager.scala:475)
   at 
 org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:409)
   at 
 org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$7$$anonfun$apply$2.apply$mcVI$sp(TaskSchedulerImpl.scala:261)
   at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
   at 
 org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$7.apply(TaskSchedulerImpl.scala:257)
   at 
 org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$7.apply(TaskSchedulerImpl.scala:254)
   at 
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
   at 
 org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:254)
   at 
 org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:254)
   at 
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
   at 
 org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:254)
   at 
 org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverActor.makeOffers(CoarseGrainedSchedulerBackend.scala:153)
   at 
 org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverActor$$anonfun$receive$1.applyOrElse(CoarseGrainedSchedulerBackend.scala:103)
   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
   at akka.actor.ActorCell.invoke(ActorCell.scala:456)
   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
   at akka.dispatch.Mailbox.run(Mailbox.scala:219)
   at 
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
   at 
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
   at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
   at 
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 {code}
 This causes the job to hang.
 I can deterministically reproduce this by re-running the test, either in 
 isolation or as part of the full performance testing suite.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-2931) getAllowedLocalityLevel() throws ArrayIndexOutOfBoundsException

2014-08-09 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2931?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14091881#comment-14091881
 ] 

Mridul Muralidharan commented on SPARK-2931:


[~joshrosen] [~kayousterhout] Added a patch which deterministically showcases 
the bug - should be easy to fix it now I hope :-)

 getAllowedLocalityLevel() throws ArrayIndexOutOfBoundsException
 ---

 Key: SPARK-2931
 URL: https://issues.apache.org/jira/browse/SPARK-2931
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.1.0
 Environment: Spark EC2, spark-1.1.0-snapshot1, sort-by-key spark-perf 
 benchmark
Reporter: Josh Rosen
Priority: Blocker
 Fix For: 1.1.0

 Attachments: scala-sort-by-key.err, test.patch


 When running Spark Perf's sort-by-key benchmark on EC2 with v1.1.0-snapshot, 
 I get the following errors (one per task):
 {code}
 14/08/08 18:54:22 INFO scheduler.TaskSetManager: Starting task 39.0 in stage 
 0.0 (TID 39, ip-172-31-14-30.us-west-2.compute.internal, PROCESS_LOCAL, 1003 
 bytes)
 14/08/08 18:54:22 INFO cluster.SparkDeploySchedulerBackend: Registered 
 executor: 
 Actor[akka.tcp://sparkexecu...@ip-172-31-9-213.us-west-2.compute.internal:58901/user/Executor#1436065036]
  with ID 0
 14/08/08 18:54:22 ERROR actor.OneForOneStrategy: 1
 java.lang.ArrayIndexOutOfBoundsException: 1
   at 
 org.apache.spark.scheduler.TaskSetManager.getAllowedLocalityLevel(TaskSetManager.scala:475)
   at 
 org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:409)
   at 
 org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$7$$anonfun$apply$2.apply$mcVI$sp(TaskSchedulerImpl.scala:261)
   at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
   at 
 org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$7.apply(TaskSchedulerImpl.scala:257)
   at 
 org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$7.apply(TaskSchedulerImpl.scala:254)
   at 
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
   at 
 org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:254)
   at 
 org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:254)
   at 
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
   at 
 org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:254)
   at 
 org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverActor.makeOffers(CoarseGrainedSchedulerBackend.scala:153)
   at 
 org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverActor$$anonfun$receive$1.applyOrElse(CoarseGrainedSchedulerBackend.scala:103)
   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
   at akka.actor.ActorCell.invoke(ActorCell.scala:456)
   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
   at akka.dispatch.Mailbox.run(Mailbox.scala:219)
   at 
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
   at 
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
   at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
   at 
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 {code}
 This causes the job to hang.
 I can deterministically reproduce this by re-running the test, either in 
 isolation or as part of the full performance testing suite.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



Re: Unit tests in 5 minutes

2014-08-09 Thread Mridul Muralidharan
Issue with supporting this imo is the fact that scala-test uses the
same vm for all the tests (surefire plugin supports fork, but
scala-test ignores it iirc).
So different tests would initialize different spark context, and can
potentially step on each others toes.

Regards,
Mridul


On Fri, Aug 8, 2014 at 9:31 PM, Nicholas Chammas
nicholas.cham...@gmail.com wrote:
 Howdy,

 Do we think it's both feasible and worthwhile to invest in getting our unit
 tests to finish in under 5 minutes (or something similarly brief) when run
 by Jenkins?

 Unit tests currently seem to take anywhere from 30 min to 2 hours. As
 people add more tests, I imagine this time will only grow. I think it would
 be better for both contributors and reviewers if they didn't have to wait
 so long for test results; PR reviews would be shorter, if nothing else.

 I don't know how how this is normally done, but maybe it wouldn't be too
 much work to get a test cycle to feel lighter.

 Most unit tests are independent and can be run concurrently, right? Would
 it make sense to build a given patch on many servers at once and send
 disjoint sets of unit tests to each?

 I'd be interested in working on something like that if possible (and
 sensible).

 Nick

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



[jira] [Commented] (SPARK-2881) Snappy is now default codec - could lead to conflicts since uses /tmp

2014-08-06 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14088018#comment-14088018
 ] 

Mridul Muralidharan commented on SPARK-2881:


To add, this will affect spark whenever tmp directory is not overridden via 
java.io.tmpdir to something ephemeral.

So local, yarn client, standalone should be affected by default (unless I 
missed something in the scripts).
I am not very of how mesos runs jobs, so cant comment about that, anyone care 
to add ?


A workaround I can think of is to always set 'org.xerial.snappy.tempdir' to a 
randomly generated directory under java.io.tmpdir as part of spark startup 
(only) once : which will cause snappy to use that directory and avoid this 
issue. 


Since snappy is the default codec now, I am marking this as a blocker for 
release

 Snappy is now default codec - could lead to conflicts since uses /tmp
 -

 Key: SPARK-2881
 URL: https://issues.apache.org/jira/browse/SPARK-2881
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.1.0
Reporter: Thomas Graves
Priority: Blocker

 I was using spark master branch and I ran into an issue with Snappy since its 
 now the default codec for shuffle. 
 The issue was that someone else had run with snappy and it created 
 /tmp/snappy-*.so but it had restrictive permissions so I was not able to use 
 it or remove it.   This caused my spark job to not start.  
 I was running in yarn client mode at the time.  Yarn cluster mode shouldn't 
 have this issue since we change the java.io.tmpdir. 
 I assume this would also affect standalone mode.
 I'm not sure if this is a true blocker but wanted to file it as one at first 
 and let us decide.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-2881) Snappy is now default codec - could lead to conflicts since uses /tmp

2014-08-06 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14088018#comment-14088018
 ] 

Mridul Muralidharan edited comment on SPARK-2881 at 8/6/14 6:45 PM:


To add, this will affect spark whenever tmp directory is not overridden via 
java.io.tmpdir to something ephemeral.

So local, yarn client, standalone should be affected by default (unless I 
missed something in the scripts).
I am not very of how mesos runs jobs, so cant comment about that, anyone care 
to add ?


A workaround I can think of is to always set 'org.xerial.snappy.tempdir' to a 
randomly generated directory under java.io.tmpdir as part of spark startup 
(only) once : which will cause snappy to use that directory and avoid this 
issue. 


Since snappy is the default codec now, I am +1 on marking this as a blocker for 
release


was (Author: mridulm80):
To add, this will affect spark whenever tmp directory is not overridden via 
java.io.tmpdir to something ephemeral.

So local, yarn client, standalone should be affected by default (unless I 
missed something in the scripts).
I am not very of how mesos runs jobs, so cant comment about that, anyone care 
to add ?


A workaround I can think of is to always set 'org.xerial.snappy.tempdir' to a 
randomly generated directory under java.io.tmpdir as part of spark startup 
(only) once : which will cause snappy to use that directory and avoid this 
issue. 


Since snappy is the default codec now, I am marking this as a blocker for 
release

 Snappy is now default codec - could lead to conflicts since uses /tmp
 -

 Key: SPARK-2881
 URL: https://issues.apache.org/jira/browse/SPARK-2881
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.1.0
Reporter: Thomas Graves
Priority: Blocker

 I was using spark master branch and I ran into an issue with Snappy since its 
 now the default codec for shuffle. 
 The issue was that someone else had run with snappy and it created 
 /tmp/snappy-*.so but it had restrictive permissions so I was not able to use 
 it or remove it.   This caused my spark job to not start.  
 I was running in yarn client mode at the time.  Yarn cluster mode shouldn't 
 have this issue since we change the java.io.tmpdir. 
 I assume this would also affect standalone mode.
 I'm not sure if this is a true blocker but wanted to file it as one at first 
 and let us decide.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



Re: -1s on pull requests?

2014-08-05 Thread Mridul Muralidharan
Just came across this mail, thanks for initiating this discussion Kay.
To add; another issue which recurs is very rapid commit's: before most
contributors have had a chance to even look at the changes proposed.
There is not much prior discussion on the jira or pr, and the time
between submitting the PR and committing it is  12 hours.

Particularly relevant when contributors are not on US timezones and/or
colocated; I have raised this a few times before when the commit had
other side effects not considered.
On flip side we have PR's which have been languishing for weeks with
little or no activity from committers side - making the contribution
stale; so too long a delay is also definitely not the direction to
take either !



Regards,
Mridul



On Tue, Jul 22, 2014 at 2:14 AM, Kay Ousterhout k...@eecs.berkeley.edu wrote:
 Hi all,

 As the number of committers / contributors on Spark has increased, there
 are cases where pull requests get merged before all the review comments
 have been addressed. This happens say when one committer points out a
 problem with the pull request, and another committer doesn't see the
 earlier comment and merges the PR before the comment has been addressed.
  This is especially tricky for pull requests with a large number of
 comments, because it can be difficult to notice early comments describing
 blocking issues.

 This also happens when something accidentally gets merged after the tests
 have started but before tests have passed.

 Do folks have ideas on how we can handle this issue? Are there other
 projects that have good ways of handling this? It looks like for Hadoop,
 people can -1 / +1 on the JIRA.

 -Kay

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



[jira] [Commented] (SPARK-2685) Update ExternalAppendOnlyMap to avoid buffer.remove()

2014-07-25 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2685?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14074186#comment-14074186
 ] 

Mridul Muralidharan commented on SPARK-2685:


We moved to using java.util.LinkedList for this

 Update ExternalAppendOnlyMap to avoid buffer.remove()
 -

 Key: SPARK-2685
 URL: https://issues.apache.org/jira/browse/SPARK-2685
 Project: Spark
  Issue Type: Sub-task
  Components: Spark Core
Reporter: Matei Zaharia

 This shifts the whole right side of the array back, which can be expensive. 
 It would be better to just swap the last element into the position we want to 
 remove at, then decrease the size of the array.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Created] (SPARK-2532) Fix issues with consolidated shuffle

2014-07-16 Thread Mridul Muralidharan (JIRA)
Mridul Muralidharan created SPARK-2532:
--

 Summary: Fix issues with consolidated shuffle
 Key: SPARK-2532
 URL: https://issues.apache.org/jira/browse/SPARK-2532
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.1.0
 Environment: All
Reporter: Mridul Muralidharan
Assignee: Mridul Muralidharan
Priority: Critical
 Fix For: 1.1.0



Will file PR with changes as soon as merge is done (earlier merge became 
outdated in 2 weeks unfortunately :) ).

Consolidated shuffle is broken in multiple ways in spark :

a) Task failure(s) can cause the state to become inconsistent.

b) Multiple revert's or combination of close/revert/close can cause the state 
to be inconsistent.
(As part of exception/error handling).

c) Some of the api in block writer causes implementation issues - for example: 
a revert is always followed by close : but the implemention tries to keep them 
separate, resulting in surface for errors.

d) Fetching data from consolidated shuffle files can go badly wrong if the file 
is being actively written to : it computes length by subtracting next offset 
from current offset (or length if this is last offset)- the latter fails when 
fetch is happening in parallel to write.
Note, this happens even if there are no task failures of any kind !
This usually results in stream corruption or decompression errors.




--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-2468) zero-copy shuffle network communication

2014-07-14 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14060543#comment-14060543
 ] 

Mridul Muralidharan commented on SPARK-2468:


We map the file content and directly write that to the socket (except when the 
size is below 8k or so iirc) - are you sure we are copying to user space and 
back ?

 zero-copy shuffle network communication
 ---

 Key: SPARK-2468
 URL: https://issues.apache.org/jira/browse/SPARK-2468
 Project: Spark
  Issue Type: Improvement
  Components: Shuffle, Spark Core
Reporter: Reynold Xin
Assignee: Reynold Xin
Priority: Critical

 Right now shuffle send goes through the block manager. This is inefficient 
 because it requires loading a block from disk into a kernel buffer, then into 
 a user space buffer, and then back to a kernel send buffer before it reaches 
 the NIC. It does multiple copies of the data and context switching between 
 kernel/user. It also creates unnecessary buffer in the JVM that increases GC
 Instead, we should use FileChannel.transferTo, which handles this in the 
 kernel space with zero-copy. See 
 http://www.ibm.com/developerworks/library/j-zerocopy/
 One potential solution is to use Netty NIO.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-2468) zero-copy shuffle network communication

2014-07-14 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14060545#comment-14060545
 ] 

Mridul Muralidharan commented on SPARK-2468:


Writing mmap'ed buffers are pretty efficient btw - the second fallback in 
transferTo implementation iirc.

 zero-copy shuffle network communication
 ---

 Key: SPARK-2468
 URL: https://issues.apache.org/jira/browse/SPARK-2468
 Project: Spark
  Issue Type: Improvement
  Components: Shuffle, Spark Core
Reporter: Reynold Xin
Assignee: Reynold Xin
Priority: Critical

 Right now shuffle send goes through the block manager. This is inefficient 
 because it requires loading a block from disk into a kernel buffer, then into 
 a user space buffer, and then back to a kernel send buffer before it reaches 
 the NIC. It does multiple copies of the data and context switching between 
 kernel/user. It also creates unnecessary buffer in the JVM that increases GC
 Instead, we should use FileChannel.transferTo, which handles this in the 
 kernel space with zero-copy. See 
 http://www.ibm.com/developerworks/library/j-zerocopy/
 One potential solution is to use Netty NIO.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-2468) zero-copy shuffle network communication

2014-07-14 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14061094#comment-14061094
 ] 

Mridul Muralidharan commented on SPARK-2468:



Ah, small files - those are indeed a problem.

Btw, we do dispose off map'ed blocks as soon as it is done; so we dont need to 
wait for gc to free them. Also note that the files are closed as soon as opened 
and mmap'ed - so they do not count towards open file count/ulimit.

Agree on 1, 3 and 4 - some of these apply to sendfile too btw : so not 
avoidable; but it is the best we have right now.
Since we use mmap'ed buffers and rarely transfer the same file again, the 
performance jump might not be the order(s) of magnitude other projects claim - 
but then even 10% (or whatever) improvement in our case would be substantial !

 zero-copy shuffle network communication
 ---

 Key: SPARK-2468
 URL: https://issues.apache.org/jira/browse/SPARK-2468
 Project: Spark
  Issue Type: Improvement
  Components: Shuffle, Spark Core
Reporter: Reynold Xin
Assignee: Reynold Xin
Priority: Critical

 Right now shuffle send goes through the block manager. This is inefficient 
 because it requires loading a block from disk into a kernel buffer, then into 
 a user space buffer, and then back to a kernel send buffer before it reaches 
 the NIC. It does multiple copies of the data and context switching between 
 kernel/user. It also creates unnecessary buffer in the JVM that increases GC
 Instead, we should use FileChannel.transferTo, which handles this in the 
 kernel space with zero-copy. See 
 http://www.ibm.com/developerworks/library/j-zerocopy/
 One potential solution is to use Netty NIO.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


Re: better compression codecs for shuffle blocks?

2014-07-14 Thread Mridul Muralidharan
We tried with lower block size for lzf, but it barfed all over the place.
Snappy was the way to go for our jobs.


Regards,
Mridul


On Mon, Jul 14, 2014 at 12:31 PM, Reynold Xin r...@databricks.com wrote:
 Hi Spark devs,

 I was looking into the memory usage of shuffle and one annoying thing is
 the default compression codec (LZF) is that the implementation we use
 allocates buffers pretty generously. I did a simple experiment and found
 that creating 1000 LZFOutputStream allocated 198976424 bytes (~190MB). If
 we have a shuffle task that uses 10k reducers and 32 threads running
 currently, the memory used by the lzf stream alone would be ~ 60GB.

 In comparison, Snappy only allocates ~ 65MB for every
 1k SnappyOutputStream. However, Snappy's compression is slightly lower than
 LZF's. In my experience, it leads to 10 - 20% increase in size. Compression
 ratio does matter here because we are sending data across the network.

 In future releases we will likely change the shuffle implementation to open
 less streams. Until that happens, I'm looking for compression codec
 implementations that are fast, allocate small buffers, and have decent
 compression ratio.

 Does anybody on this list have any suggestions? If not, I will submit a
 patch for 1.1 that replaces LZF with Snappy for the default compression
 codec to lower memory usage.


 allocation data here: https://gist.github.com/rxin/ad7217ea60e3fb36c567


[jira] [Commented] (SPARK-2398) Trouble running Spark 1.0 on Yarn

2014-07-13 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14060113#comment-14060113
 ] 

Mridul Muralidharan commented on SPARK-2398:



As discussed in the PR, I am attempting to list the various factors which 
contribute to overhead.
Note, this is not exhaustive (yet) - please add more to this JIRA - so that 
when we are reasonably sure, we can model the expected overhead based on these 
factors.

These factors are typically off-heap - since anything within heap is budgetted 
for by Xmx - and enforced by VM : and so should ideally (not practically 
always, see gc overheads) not exceed the Xmx value

1) 256 KB per socket accepted via ConnectionManager for inter-worker comm 
(setReceiveBufferSize)
Typically, there will be (numExecutor - 1) number of sockets open.

2) 128 KB per socket for writing output to dfs. For reads, this does not seem 
to be configured - and should be 8k per socket iirc.
Typically 1 per executor at a given point in time ?

3) 256k for each akka socket for send/receive buffer.
One per worker ? (to talk to master) - so 512kb ? Any other use of akka ?

4) If I am not wrong, netty might allocate multiple spark.akka.frameSize 
sized direct buffer. There might be a few of these allocated and pooled/reused.
I did not go in detail into netty code though. If someone else with more 
knowhow can clarify, that would be great !
Default size of 10mb for spark.akka.frameSize

5) The default size of the assembled spark jar is about 12x mb (and changing) - 
though not all classes get loaded, the overhead would be some function of this.
The actual footprint would be higher than the on-disk size.
IIRC this is outside of the heap - [~sowen], any comments on this ? I have not 
looked into these in like 10 years now !

6) Per thread (Xss) overhead of 1mb (for 64bit vm).
Last I recall, we have about 220 odd threads - not sure if this was at the 
master or on the workers.
Ofcourse, this is dependent on the various threadpools we use (io, computation, 
etc), akka and netty config, etc.

7) Disk read overhead.
Thanks for [~pwendell]'s fix, atleast for small files, the overhead is not too 
high - since we do not mmap files but directly read them.
But for anything larger than 8kb (default), we use memory mapped buffers.
The actual overhead depends on the number of files opened for read via 
DiskStore - and the entire file contents get mmap'ed into virt mem.
Note that there is some non-virt-mem overhead also at native level for these 
buffers.

The actual number of files opened should be carefully tracked to understand the 
effect of this on spark overhead : since this aspect is changing a lot off late.
Impact is on shuffle,  disk persisted rdd, among others.
The actual value would be application dependent (how large the data is !)


8) The overhead introduced by VM not being able to reclaim memory completely 
(the cost of moving data vs amount of space reclaimed).
Ideally, this should be low - but would be dependent on the heap space, 
collector used, among other things.
I am not very knowledgable of the recent advances in gc collectors, so I 
hesitate to put a number to this.



I am sure this is not an exhaustive list, please do add to this.
In our case specifically, and [~tgraves] could add more, the number of 
containers can be high (300+ is easily possible), memory per container is 
modest (8gig usually).
To add details of observed overhead patterns (from the PR discussion) - 
a) I have had inhouse GBDT impl run without customizing overhead (so default of 
384 mb) with 12gb container and 22 nodes on reasonably large dataset.
b) I have had to customize overhead to 1.7gb for collaborative filtering with 
8gb container and 300 nodes (on a fairly large dataset).
c) I have had to minimally customize overhead to do inhouse QR factorization of 
a 50k x 50k distributed dense matrix on 45 nodes at 12 gb each (this was 
incorrectly specified in the PR discussion).

 Trouble running Spark 1.0 on Yarn 
 --

 Key: SPARK-2398
 URL: https://issues.apache.org/jira/browse/SPARK-2398
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.0.0
Reporter: Nishkam Ravi

 Trouble running workloads in Spark-on-YARN cluster mode for Spark 1.0. 
 For example: SparkPageRank when run in standalone mode goes through without 
 any errors (tested for up to 30GB input dataset on a 6-node cluster).  Also 
 runs fine for a 1GB dataset in yarn cluster mode. Starts to choke (in yarn 
 cluster mode) as the input data size is increased. Confirmed for 16GB input 
 dataset.
 The same workload runs fine with Spark 0.9 in both standalone and yarn 
 cluster mode (for up to 30 GB input dataset on a 6-node cluster).
 Commandline used:
 (/opt/cloudera/parcels/CDH/lib/spark/bin/spark-submit

Re: [GitHub] spark pull request: Modify default YARN memory_overhead-- from an ...

2014-07-13 Thread Mridul Muralidharan
You are lucky :-) for some of our jobs, in a 8gb container, overhead is
1.8gb !
On 13-Jul-2014 2:40 pm, nishkamravi2 g...@git.apache.org wrote:

 Github user nishkamravi2 commented on the pull request:

 https://github.com/apache/spark/pull/1391#issuecomment-48835560

 Sean, the memory_overhead is fairly substantial. More than 2GB for a
 30GB executor. Less than 400MB for a 2GB executor.


 ---
 If your project is set up for it, you can reply to this email and have your
 reply appear on GitHub as well. If your project does not have this feature
 enabled and wishes so, or if the feature is enabled but not working, please
 contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
 with INFRA.
 ---



Unresponsive to PR/jira changes

2014-07-09 Thread Mridul Muralidharan
Hi,


  I noticed today that gmail has been marking most of the mails from
spark github/jira I was receiving to spam folder; and I was assuming
it was lull in activity due to spark summit for past few weeks !

In case I have commented on specific PR/JIRA issues and not followed
up, apologies for the same - please do reach out in case it is still
pending something from my end.



Regards,
Mridul


Re: on shark, is tachyon less efficient than memory_only cache strategy ?

2014-07-08 Thread Mridul Muralidharan
You are ignoring serde costs :-)

- Mridul

On Tue, Jul 8, 2014 at 8:48 PM, Aaron Davidson ilike...@gmail.com wrote:
 Tachyon should only be marginally less performant than memory_only, because
 we mmap the data from Tachyon's ramdisk. We do not have to, say, transfer
 the data over a pipe from Tachyon; we can directly read from the buffers in
 the same way that Shark reads from its in-memory columnar format.



 On Tue, Jul 8, 2014 at 1:18 AM, qingyang li liqingyang1...@gmail.com
 wrote:

 hi, when i create a table, i can point the cache strategy using
 shark.cache,
 i think shark.cache=memory_only  means data are managed by spark, and
 data are in the same jvm with excutor;   while  shark.cache=tachyon
  means  data are managed by tachyon which is off heap, and data are not in
 the same jvm with excutor,  so spark will load data from tachyon for each
 query sql , so,  is  tachyon less efficient than memory_only cache strategy
  ?
 if yes, can we let spark load all data once from tachyon  for all sql query
  if i want to use tachyon cache strategy since tachyon is more HA than
 memory_only ?



[jira] [Commented] (SPARK-2390) Files in staging directory cannot be deleted and wastes the space of HDFS

2014-07-07 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14054162#comment-14054162
 ] 

Mridul Muralidharan commented on SPARK-2390:


Here, and a bunch of other places, spark currently closes the Filesystem 
instance : this is incorrect, and should not be done.
The fix would be to remove the fs.close; not force creation of new instances.

 Files in staging directory cannot be deleted and wastes the space of HDFS
 -

 Key: SPARK-2390
 URL: https://issues.apache.org/jira/browse/SPARK-2390
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.0.0, 1.0.1
Reporter: Kousuke Saruta

 When running jobs with YARN Cluster mode and using HistoryServer, the files 
 in the Staging Directory cannot be deleted.
 HistoryServer uses directory where event log is written, and the directory is 
 represented as a instance of o.a.h.f.FileSystem created by using 
 FileSystem.get.
 {code:title=FileLogger.scala}
 private val fileSystem = Utils.getHadoopFileSystem(new URI(logDir))
 {code}
 {code:title=utils.getHadoopFileSystem}
 def getHadoopFileSystem(path: URI): FileSystem = {
   FileSystem.get(path, SparkHadoopUtil.get.newConfiguration())
 }
 {code}
 On the other hand, ApplicationMaster has a instance named fs, which also 
 created by using FileSystem.get.
 {code:title=ApplicationMaster}
 private val fs = FileSystem.get(yarnConf)
 {code}
 FileSystem.get returns cached same instance when URI passed to the method 
 represents same file system and the method is called by same user.
 Because of the behavior, when the directory for event log is on HDFS, fs of 
 ApplicationMaster and fileSystem of FileLogger is same instance.
 When shutting down ApplicationMaster, fileSystem.close is called in 
 FileLogger#stop, which is invoked by SparkContext#stop indirectly.
 {code:title=FileLogger.stop}
 def stop() {
   hadoopDataStream.foreach(_.close())
   writer.foreach(_.close())
   fileSystem.close()
 }
 {code}
 And  ApplicationMaster#cleanupStagingDir also called by JVM shutdown hook. In 
 this method, fs.delete(stagingDirPath) is invoked. 
 Because fs.delete in ApplicationMaster is called after fileSystem.close in 
 FileLogger, fs.delete fails and results not deleting files in the staging 
 directory.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-2277) Make TaskScheduler track whether there's host on a rack

2014-07-04 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14052275#comment-14052275
 ] 

Mridul Muralidharan commented on SPARK-2277:


Hmm, good point - that PR does change the scheduler expectations in a lot of 
ways which were not all anticipated.
Let me go through the current PR; thanks for the bug !

 Make TaskScheduler track whether there's host on a rack
 ---

 Key: SPARK-2277
 URL: https://issues.apache.org/jira/browse/SPARK-2277
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 1.0.0
Reporter: Rui Li

 When TaskSetManager adds a pending task, it checks whether the tasks's 
 preferred location is available. Regarding RACK_LOCAL task, we consider the 
 preferred rack available if such a rack is defined for the preferred host. 
 This is incorrect as there may be no alive hosts on that rack at all. 
 Therefore, TaskScheduler should track the hosts on each rack, and provides an 
 API for TaskSetManager to check if there's host alive on a specific rack.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-2017) web ui stage page becomes unresponsive when the number of tasks is large

2014-07-04 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14052289#comment-14052289
 ] 

Mridul Muralidharan commented on SPARK-2017:


With aggregated metrics, we loose the ability to check for gc time (which is 
actually what I use that UI for, other than to dig up exceptions on failed 
tasks).

 web ui stage page becomes unresponsive when the number of tasks is large
 

 Key: SPARK-2017
 URL: https://issues.apache.org/jira/browse/SPARK-2017
 Project: Spark
  Issue Type: Sub-task
Reporter: Reynold Xin
  Labels: starter

 {code}
 sc.parallelize(1 to 100, 100).count()
 {code}
 The above code creates one million tasks to be executed. The stage detail web 
 ui page takes forever to load (if it ever completes).
 There are again a few different alternatives:
 0. Limit the number of tasks we show.
 1. Pagination
 2. By default only show the aggregate metrics and failed tasks, and hide the 
 successful ones.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-2017) web ui stage page becomes unresponsive when the number of tasks is large

2014-07-04 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14052679#comment-14052679
 ] 

Mridul Muralidharan commented on SPARK-2017:


Sounds great, ability to get to currently running tasks (to check current 
state), ability to get to task failures (to debug usually), some aggregate 
stats (gc, stats per executor, etc) and having some way to get to the full 
details (which is what is seen currently) in an advanced or full view.
Anything else would be a bonus :-)

 web ui stage page becomes unresponsive when the number of tasks is large
 

 Key: SPARK-2017
 URL: https://issues.apache.org/jira/browse/SPARK-2017
 Project: Spark
  Issue Type: Sub-task
Reporter: Reynold Xin
  Labels: starter

 {code}
 sc.parallelize(1 to 100, 100).count()
 {code}
 The above code creates one million tasks to be executed. The stage detail web 
 ui page takes forever to load (if it ever completes).
 There are again a few different alternatives:
 0. Limit the number of tasks we show.
 1. Pagination
 2. By default only show the aggregate metrics and failed tasks, and hide the 
 successful ones.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


Re: Eliminate copy while sending data : any Akka experts here ?

2014-07-04 Thread Mridul Muralidharan
In our clusters, number of containers we can get is high but memory
per container is low : which is why avg_nodes_not_hosting data is
rarely zero for ML tasks :-)

To update - to unblock our current implementation efforts, we went
with broadcast - since it is intutively easier and minimal change; and
compress the array as bytes in TaskResult.
This is then stored in disk backed maps - to remove memory pressure on
master and workers (else MapOutputTracker becomes a memory hog).

But I agree, compressed bitmap to represent 'large' blocks (anything
larger that maxBytesInFlight actually) and probably existing to track
non zero should be fine (we should not really track zero output for
reducer - just waste of space).


Regards,
Mridul

On Fri, Jul 4, 2014 at 3:43 AM, Reynold Xin r...@databricks.com wrote:
 Note that in my original proposal, I was suggesting we could track whether
 block size = 0 using a compressed bitmap. That way we can still avoid
 requests for zero-sized blocks.



 On Thu, Jul 3, 2014 at 3:12 PM, Reynold Xin r...@databricks.com wrote:

 Yes, that number is likely == 0 in any real workload ...


 On Thu, Jul 3, 2014 at 8:01 AM, Mridul Muralidharan mri...@gmail.com
 wrote:

 On Thu, Jul 3, 2014 at 11:32 AM, Reynold Xin r...@databricks.com wrote:
  On Wed, Jul 2, 2014 at 3:44 AM, Mridul Muralidharan mri...@gmail.com
  wrote:
 
 
  
   The other thing we do need is the location of blocks. This is
 actually
  just
   O(n) because we just need to know where the map was run.
 
  For well partitioned data, wont this not involve a lot of unwanted
  requests to nodes which are not hosting data for a reducer (and lack
  of ability to throttle).
 
 
  Was that a question? (I'm guessing it is). What do you mean exactly?


 I was not sure if I understood the proposal correctly - hence the
 query : if I understood it right - the number of wasted requests goes
 up by num_reducers * avg_nodes_not_hosting data.

 Ofcourse, if avg_nodes_not_hosting data == 0, then we are fine !

 Regards,
 Mridul





[jira] [Created] (SPARK-2353) ArrayIndexOutOfBoundsException in scheduler

2014-07-03 Thread Mridul Muralidharan (JIRA)
Mridul Muralidharan created SPARK-2353:
--

 Summary: ArrayIndexOutOfBoundsException in scheduler
 Key: SPARK-2353
 URL: https://issues.apache.org/jira/browse/SPARK-2353
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.1.0
Reporter: Mridul Muralidharan
Priority: Blocker



I suspect the recent changes from SPARK-1937 to compute valid locality levels 
(and ignoring ones which are not applicable) has resulted in this issue.
Specifically, some of the code using currentLocalityIndex (and lastLaunchTime 
actually) seems to be assuming 
a) constant population of locality levels.
b) probably also immutablility/repeatibility of locality levels

These do not hold any longer.
I do not have the exact values for which this failure was observed (since this 
is from the logs of a failed job) - but the code path is highly suspect.

Also note that the line numbers/classes might not exactly match master since we 
are in the middle of a merge. But the issue should hopefully be evident.

java.lang.ArrayIndexOutOfBoundsException: 2
at 
org.apache.spark.scheduler.TaskSetManager.getAllowedLocalityLevel(TaskSetManager.scala:439)
at 
org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:388)
at 
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$5$$anonfun$apply$2.apply$mcVI$sp(TaskSchedulerImpl.scala:248)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at 
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$5.apply(TaskSchedulerImpl.scala:244)
at 
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$5.apply(TaskSchedulerImpl.scala:241)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at 
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:241)
at 
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:241)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at 
org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:241)
at 
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverActor.makeOffers(CoarseGrainedSchedulerBackend.scala:133)
at 
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverActor$$anonfun$receive$1.applyOrElse(CoarseGrainedSchedulerBackend.scala:86)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-2277) Make TaskScheduler track whether there's host on a rack

2014-07-03 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14051575#comment-14051575
 ] 

Mridul Muralidharan commented on SPARK-2277:


I have not rechecked that the code, but the way it was originally written by me 
was :

a) Task preference is decoupled from availability of the node.
For example, we need not have an executor on a host for which a block has host 
preference (example dfs blocks on a shared cluster)
Also note that a block might have one or more preferred location.

b) We lookup the rack for the preferred location to get preferred rack.
As with (a), there need not be an executor on that rack. This is just the rack 
preference.


c) At schedule time, for an executor, we lookup the host/rack of the executors 
location - and decide appropriately based on that.



In this context, I think your requirement is already handled.
Even if we dont have any hosts alive on a rack, those tasks would still be 
mentioned with rack local preference in task set manager.
When an executor comes in (existing or new), we check that executors rack with 
task preference - and it would now be marked rack local.

 Make TaskScheduler track whether there's host on a rack
 ---

 Key: SPARK-2277
 URL: https://issues.apache.org/jira/browse/SPARK-2277
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 1.0.0
Reporter: Rui Li

 When TaskSetManager adds a pending task, it checks whether the tasks's 
 preferred location is available. Regarding RACK_LOCAL task, we consider the 
 preferred rack available if such a rack is defined for the preferred host. 
 This is incorrect as there may be no alive hosts on that rack at all. 
 Therefore, TaskScheduler should track the hosts on each rack, and provides an 
 API for TaskSetManager to check if there's host alive on a specific rack.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (SPARK-2353) ArrayIndexOutOfBoundsException in scheduler

2014-07-03 Thread Mridul Muralidharan (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-2353?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan updated SPARK-2353:
---

Description: 
I suspect the recent changes from SPARK-1937 to compute valid locality levels 
(and ignoring ones which are not applicable) has resulted in this issue.
Specifically, some of the code using currentLocalityIndex (and lastLaunchTime 
actually) seems to be assuming 
a) constant population of locality levels.
b) probably also immutablility/repeatibility of locality levels

These do not hold any longer.
I do not have the exact values for which this failure was observed (since this 
is from the logs of a failed job) - but the code path is suspect.

Also note that the line numbers/classes might not exactly match master since we 
are in the middle of a merge. But the issue should hopefully be evident.

java.lang.ArrayIndexOutOfBoundsException: 2
at 
org.apache.spark.scheduler.TaskSetManager.getAllowedLocalityLevel(TaskSetManager.scala:439)
at 
org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:388)
at 
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$5$$anonfun$apply$2.apply$mcVI$sp(TaskSchedulerImpl.scala:248)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at 
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$5.apply(TaskSchedulerImpl.scala:244)
at 
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$5.apply(TaskSchedulerImpl.scala:241)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at 
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:241)
at 
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:241)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at 
org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:241)
at 
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverActor.makeOffers(CoarseGrainedSchedulerBackend.scala:133)
at 
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverActor$$anonfun$receive$1.applyOrElse(CoarseGrainedSchedulerBackend.scala:86)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


Unfortunately, we do not have the bandwidth to tackle this issue - would be 
great if someone could take a look at it ! Thanks.

  was:

I suspect the recent changes from SPARK-1937 to compute valid locality levels 
(and ignoring ones which are not applicable) has resulted in this issue.
Specifically, some of the code using currentLocalityIndex (and lastLaunchTime 
actually) seems to be assuming 
a) constant population of locality levels.
b) probably also immutablility/repeatibility of locality levels

These do not hold any longer.
I do not have the exact values for which this failure was observed (since this 
is from the logs of a failed job) - but the code path is highly suspect.

Also note that the line numbers/classes might not exactly match master since we 
are in the middle of a merge. But the issue should hopefully be evident.

java.lang.ArrayIndexOutOfBoundsException: 2
at 
org.apache.spark.scheduler.TaskSetManager.getAllowedLocalityLevel(TaskSetManager.scala:439)
at 
org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:388)
at 
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$5$$anonfun$apply$2.apply$mcVI$sp(TaskSchedulerImpl.scala:248)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at 
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$5.apply(TaskSchedulerImpl.scala:244)
at 
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$5.apply(TaskSchedulerImpl.scala:241

Re: Eliminate copy while sending data : any Akka experts here ?

2014-07-03 Thread Mridul Muralidharan
On Thu, Jul 3, 2014 at 11:32 AM, Reynold Xin r...@databricks.com wrote:
 On Wed, Jul 2, 2014 at 3:44 AM, Mridul Muralidharan mri...@gmail.com
 wrote:


 
  The other thing we do need is the location of blocks. This is actually
 just
  O(n) because we just need to know where the map was run.

 For well partitioned data, wont this not involve a lot of unwanted
 requests to nodes which are not hosting data for a reducer (and lack
 of ability to throttle).


 Was that a question? (I'm guessing it is). What do you mean exactly?


I was not sure if I understood the proposal correctly - hence the
query : if I understood it right - the number of wasted requests goes
up by num_reducers * avg_nodes_not_hosting data.

Ofcourse, if avg_nodes_not_hosting data == 0, then we are fine !

Regards,
Mridul


[jira] [Commented] (SPARK-2277) Make TaskScheduler track whether there's host on a rack

2014-07-02 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14050886#comment-14050886
 ] 

Mridul Muralidharan commented on SPARK-2277:


I am not sure I follow this requirement.
For preferred locations, we populate their corresponding racks (if available) 
as preferred rack.

For available executors hosts, we lookup the rack they belong to - and then see 
if that rack is preferred or not.

This, ofcourse, assumes a host is only on a single rack.


What exactly is the behavior you are expecting from scheduler ?

 Make TaskScheduler track whether there's host on a rack
 ---

 Key: SPARK-2277
 URL: https://issues.apache.org/jira/browse/SPARK-2277
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 1.0.0
Reporter: Rui Li

 When TaskSetManager adds a pending task, it checks whether the tasks's 
 preferred location is available. Regarding RACK_LOCAL task, we consider the 
 preferred rack available if such a rack is defined for the preferred host. 
 This is incorrect as there may be no alive hosts on that rack at all. 
 Therefore, TaskScheduler should track the hosts on each rack, and provides an 
 API for TaskSetManager to check if there's host alive on a specific rack.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


Re: Eliminate copy while sending data : any Akka experts here ?

2014-07-02 Thread Mridul Muralidharan
Hi Patrick,

  Please see inline.

Regards,
Mridul


On Wed, Jul 2, 2014 at 10:52 AM, Patrick Wendell pwend...@gmail.com wrote:
 b) Instead of pulling this information, push it to executors as part
 of task submission. (What Patrick mentioned ?)
 (1) a.1 from above is still an issue for this.

 I don't understand problem a.1 is. In this case, we don't need to do
 caching, right?


To rephrase in this context, attempting to cache wont help since it is
reducer specific and benefits are minimal (other than for reexecution
for failures and speculative tasks).



 (2) Serialized task size is also a concern : we have already seen
 users hitting akka limits for task size - this will be an additional
 vector which might exacerbate it.

 This would add only a small, constant amount of data to the task. It's
 strictly better than before. Before if the map output status array was
 size M x R, we send a single akka message to every node of size M x
 R... this basically scales quadratically with the size of the RDD. The
 new approach is constant... it's much better. And the total amount of
 data send over the wire is likely much less.


It would be a function of the number of mappers - and an overhead for each task.


Regards,
Mridul


 - Patrick


Re: Eliminate copy while sending data : any Akka experts here ?

2014-07-02 Thread Mridul Muralidharan
Hi Reynold,

  Please see inline.

Regards,
Mridul

On Wed, Jul 2, 2014 at 10:57 AM, Reynold Xin r...@databricks.com wrote:
 I was actually talking to tgraves today at the summit about this.

 Based on my understanding, the sizes we track and send (which is
 unfortunately O(M*R) regardless of how we change the implementation --
 whether we send via task or send via MapOutputTracker) is only used to
 compute maxBytesInFlight so we can throttle the fetching speed to not
 result in oom. Perhaps for very large shuffles, we don't need to send the
 bytes for each block, and we can send whether they are zero or not (which
 can be tracked via a compressed bitmap that can be tiny).

You are right, currently for large blocks, we just need to know where
the block exists.
I was not sure if there was any possible future extension on that -
for this reason, in order to preserve functionality, we moved to using
Short from Byte for MapOutputTracker.compressedSize (to ensure large
sizes can be represented with 0.7% error).

Within a MapStatus, we moved to holding compressed data to save on
space within master/workers (particularly for large number of
reducers).

If we do not anticipate any other reason for size, we can move back
to using Byte instead of Short to compress size (which will reduce
required space by some factor less than 2) : since error in computed
size for blocks larger than maxBytesInFlight does not really matter :
we will split them into different FetchRequest's.



 The other thing we do need is the location of blocks. This is actually just
 O(n) because we just need to know where the map was run.

For well partitioned data, wont this not involve a lot of unwanted
requests to nodes which are not hosting data for a reducer (and lack
of ability to throttle).


Regards,
Mridul



 On Tue, Jul 1, 2014 at 2:51 AM, Mridul Muralidharan mri...@gmail.com
 wrote:

 We had considered both approaches (if I understood the suggestions right) :
 a) Pulling only map output states for tasks which run on the reducer
 by modifying the Actor. (Probably along lines of what Aaron described
 ?)
 The performance implication of this was bad :
 1) We cant cache serialized result anymore, (caching it makes no sense
 rather).
 2) The number requests to master will go from num_executors to
 num_reducers - the latter can be orders of magnitude higher than
 former.

 b) Instead of pulling this information, push it to executors as part
 of task submission. (What Patrick mentioned ?)
 (1) a.1 from above is still an issue for this.
 (2) Serialized task size is also a concern : we have already seen
 users hitting akka limits for task size - this will be an additional
 vector which might exacerbate it.
 Our jobs are not hitting this yet though !

 I was hoping there might be something in akka itself to alleviate this
 - but if not, we can solve it within context of spark.

 Currently, we have worked around it by using broadcast variable when
 serialized size is above some threshold - so that our immediate
 concerns are unblocked :-)
 But a better solution should be greatly welcomed !
 Maybe we can unify it with large serialized task as well ...


 Btw, I am not sure what the higher cost of BlockManager referred to is
 Aaron - do you mean the cost of persisting the serialized map outputs
 to disk ?




 Regards,
 Mridul


 On Tue, Jul 1, 2014 at 1:36 PM, Patrick Wendell pwend...@gmail.com
 wrote:
  Yeah I created a JIRA a while back to piggy-back the map status info
  on top of the task (I honestly think it will be a small change). There
  isn't a good reason to broadcast the entire array and it can be an
  issue during large shuffles.
 
  - Patrick
 
  On Mon, Jun 30, 2014 at 7:58 PM, Aaron Davidson ilike...@gmail.com
 wrote:
  I don't know of any way to avoid Akka doing a copy, but I would like to
  mention that it's on the priority list to piggy-back only the map
 statuses
  relevant to a particular map task on the task itself, thus reducing the
  total amount of data sent over the wire by a factor of N for N physical
  machines in your cluster. Ideally we would also avoid Akka entirely when
  sending the tasks, as these can get somewhat large and Akka doesn't work
  well with large messages.
 
  Do note that your solution of using broadcast to send the map tasks is
 very
  similar to how the executor returns the result of a task when it's too
 big
  for akka. We were thinking of refactoring this too, as using the block
  manager has much higher latency than a direct TCP send.
 
 
  On Mon, Jun 30, 2014 at 12:13 PM, Mridul Muralidharan mri...@gmail.com
 
  wrote:
 
  Our current hack is to use Broadcast variables when serialized
  statuses are above some (configurable) size : and have the workers
  directly pull them from master.
  This is a workaround : so would be great if there was a
  better/principled solution.
 
  Please note that the responses are going to different workers
  requesting for the output statuses for shuffle (after

Re: Eliminate copy while sending data : any Akka experts here ?

2014-07-01 Thread Mridul Muralidharan
We had considered both approaches (if I understood the suggestions right) :
a) Pulling only map output states for tasks which run on the reducer
by modifying the Actor. (Probably along lines of what Aaron described
?)
The performance implication of this was bad :
1) We cant cache serialized result anymore, (caching it makes no sense rather).
2) The number requests to master will go from num_executors to
num_reducers - the latter can be orders of magnitude higher than
former.

b) Instead of pulling this information, push it to executors as part
of task submission. (What Patrick mentioned ?)
(1) a.1 from above is still an issue for this.
(2) Serialized task size is also a concern : we have already seen
users hitting akka limits for task size - this will be an additional
vector which might exacerbate it.
Our jobs are not hitting this yet though !

I was hoping there might be something in akka itself to alleviate this
- but if not, we can solve it within context of spark.

Currently, we have worked around it by using broadcast variable when
serialized size is above some threshold - so that our immediate
concerns are unblocked :-)
But a better solution should be greatly welcomed !
Maybe we can unify it with large serialized task as well ...


Btw, I am not sure what the higher cost of BlockManager referred to is
Aaron - do you mean the cost of persisting the serialized map outputs
to disk ?




Regards,
Mridul


On Tue, Jul 1, 2014 at 1:36 PM, Patrick Wendell pwend...@gmail.com wrote:
 Yeah I created a JIRA a while back to piggy-back the map status info
 on top of the task (I honestly think it will be a small change). There
 isn't a good reason to broadcast the entire array and it can be an
 issue during large shuffles.

 - Patrick

 On Mon, Jun 30, 2014 at 7:58 PM, Aaron Davidson ilike...@gmail.com wrote:
 I don't know of any way to avoid Akka doing a copy, but I would like to
 mention that it's on the priority list to piggy-back only the map statuses
 relevant to a particular map task on the task itself, thus reducing the
 total amount of data sent over the wire by a factor of N for N physical
 machines in your cluster. Ideally we would also avoid Akka entirely when
 sending the tasks, as these can get somewhat large and Akka doesn't work
 well with large messages.

 Do note that your solution of using broadcast to send the map tasks is very
 similar to how the executor returns the result of a task when it's too big
 for akka. We were thinking of refactoring this too, as using the block
 manager has much higher latency than a direct TCP send.


 On Mon, Jun 30, 2014 at 12:13 PM, Mridul Muralidharan mri...@gmail.com
 wrote:

 Our current hack is to use Broadcast variables when serialized
 statuses are above some (configurable) size : and have the workers
 directly pull them from master.
 This is a workaround : so would be great if there was a
 better/principled solution.

 Please note that the responses are going to different workers
 requesting for the output statuses for shuffle (after map) - so not
 sure if back pressure buffers, etc would help.


 Regards,
 Mridul


 On Mon, Jun 30, 2014 at 11:07 PM, Mridul Muralidharan mri...@gmail.com
 wrote:
  Hi,
 
While sending map output tracker result, the same serialized byte
  array is sent multiple times - but the akka implementation copies it
  to a private byte array within ByteString for each send.
  Caching a ByteString instead of Array[Byte] did not help, since akka
  does not support special casing ByteString : serializes the
  ByteString, and copies the result out to an array before creating
  ByteString out of it (in Array[Byte] serializing is thankfully simply
  returning same array - so one copy only).
 
 
  Given the need to send immutable data large number of times, is there
  any way to do it in akka without copying internally in akka ?
 
 
  To see how expensive it is, for 200 nodes withi large number of
  mappers and reducers, the status becomes something like 30 mb for us -
  and pulling this about 200 to 300 times results in OOM due to the
  large number of copies sent out.
 
 
  Thanks,
  Mridul



[jira] [Commented] (SPARK-2294) TaskSchedulerImpl and TaskSetManager do not properly prioritize which tasks get assigned to an executor

2014-06-26 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14045433#comment-14045433
 ] 

Mridul Muralidharan commented on SPARK-2294:


I agree; We should bump no locality pref and speculative tasks to NODE_LOCAL 
level after NODE_LOCAL tasks have been scheduled (if available), and not check 
for them at PROCESS_LOCAL max locality. So they get scheduled before RACK_LOCAL 
but after NODE_LOCAL.
This is an artifact of the design when there was no PROCESS_LOCAL and 
NODE_LOCAL was the best schedule possible (without explicitly having these 
level : we had node and any).

 TaskSchedulerImpl and TaskSetManager do not properly prioritize which tasks 
 get assigned to an executor
 ---

 Key: SPARK-2294
 URL: https://issues.apache.org/jira/browse/SPARK-2294
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.0.0, 1.0.1
Reporter: Kay Ousterhout

 If an executor E is free, a task may be speculatively assigned to E when 
 there are other tasks in the job that have not been launched (at all) yet.  
 Similarly, a task without any locality preferences may be assigned to E when 
 there was another NODE_LOCAL task that could have been scheduled. 
 This happens because TaskSchedulerImpl calls TaskSetManager.resourceOffer 
 (which in turn calls TaskSetManager.findTask) with increasing locality 
 levels, beginning with PROCESS_LOCAL, followed by NODE_LOCAL, and so on until 
 the highest currently allowed level.  Now, supposed NODE_LOCAL is the highest 
 currently allowed locality level.  The first time findTask is called, it will 
 be called with max level PROCESS_LOCAL; if it cannot find any PROCESS_LOCAL 
 tasks, it will try to schedule tasks with no locality preferences or 
 speculative tasks.  As a result, speculative tasks or tasks with no 
 preferences may be scheduled instead of NODE_LOCAL tasks.
 cc [~matei]



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-2268) Utils.createTempDir() creates race with HDFS at shutdown

2014-06-25 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14043088#comment-14043088
 ] 

Mridul Muralidharan commented on SPARK-2268:


That is not because of this hook.
There are a bunch of places in spark where filesystem objects are (incorrectly 
I should add) getting closed : some within shutdown hooks (check in stop method 
in various services in spark) and others elsewhere (like checkpointing code).

I have fixed a bunch of these as part of some other work ... should come in a 
PR soon.

 Utils.createTempDir() creates race with HDFS at shutdown
 

 Key: SPARK-2268
 URL: https://issues.apache.org/jira/browse/SPARK-2268
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.0.0
Reporter: Marcelo Vanzin

 Utils.createTempDir() has this code:
 {code}
 // Add a shutdown hook to delete the temp dir when the JVM exits
 Runtime.getRuntime.addShutdownHook(new Thread(delete Spark temp dir  + 
 dir) {
   override def run() {
 // Attempt to delete if some patch which is parent of this is not 
 already registered.
 if (! hasRootAsShutdownDeleteDir(dir)) Utils.deleteRecursively(dir)
   }
 })
 {code}
 This creates a race with the shutdown hooks registered by HDFS, since the 
 order of execution is undefined; if the HDFS hooks run first, you'll get 
 exceptions about the file system being closed.
 Instead, this should use Hadoop's ShutdownHookManager with a proper priority, 
 so that it runs before the HDFS hooks.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-2268) Utils.createTempDir() creates race with HDFS at shutdown

2014-06-24 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14043071#comment-14043071
 ] 

Mridul Muralidharan commented on SPARK-2268:


Setting priority for shutdown hooks does not have too much impact given the 
state of the VM.
Note that this hook is trying to delete local directories - not dfs directories.

 Utils.createTempDir() creates race with HDFS at shutdown
 

 Key: SPARK-2268
 URL: https://issues.apache.org/jira/browse/SPARK-2268
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.0.0
Reporter: Marcelo Vanzin

 Utils.createTempDir() has this code:
 {code}
 // Add a shutdown hook to delete the temp dir when the JVM exits
 Runtime.getRuntime.addShutdownHook(new Thread(delete Spark temp dir  + 
 dir) {
   override def run() {
 // Attempt to delete if some patch which is parent of this is not 
 already registered.
 if (! hasRootAsShutdownDeleteDir(dir)) Utils.deleteRecursively(dir)
   }
 })
 {code}
 This creates a race with the shutdown hooks registered by HDFS, since the 
 order of execution is undefined; if the HDFS hooks run first, you'll get 
 exceptions about the file system being closed.
 Instead, this should use Hadoop's ShutdownHookManager with a proper priority, 
 so that it runs before the HDFS hooks.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


Re: [jira] [Created] (SPARK-1867) Spark Documentation Error causes java.lang.IllegalStateException: unread block data

2014-06-23 Thread Mridul Muralidharan
There are a few interacting issues here - and unfortunately I dont
recall all of it (since this was fixed a few months back).
From memory though :

a) With shuffle consolidation, data sent to remote node incorrectly
includes data from partially constructed blocks - not just the request
blocks.
Actually, with shuffle consolidation (with and without failures) quite
a few things broke.

b) There might have been a few other bugs in DiskBlockObjectWriter too.

c) We also suspected buffers overlapping when using cached kryo
serializer (though never proved this, just disabled caching across
board for now : and always create new instance).

The way we debug'ed it is by introducing an Input/Output stream which
introduced checksum into the data stream and validating that at each
side for compression, serialization, etc.

Apologies for being non specific ... I really dont have the details
right now, and our internal branch is in flux due to merge effort to
port our local changes to master.
Hopefully we will be able to submit PR's as soon as this is done and
testcases are added to validate.


Regards,
Mridul




On Tue, Jun 24, 2014 at 10:21 AM, Reynold Xin r...@databricks.com wrote:
 Mridul,

 Can you comment a little bit more on this issue? We are running into the
 same stack trace but not sure whether it is just different Spark versions
 on each cluster (doesn't seem likely) or a bug in Spark.

 Thanks.



 On Sat, May 17, 2014 at 4:41 AM, Mridul Muralidharan mri...@gmail.com
 wrote:

 I suspect this is an issue we have fixed internally here as part of a
 larger change - the issue we fixed was not a config issue but bugs in
 spark.

 Unfortunately we plan to contribute this as part of 1.1

 Regards,
 Mridul
 On 17-May-2014 4:09 pm, sam (JIRA) j...@apache.org wrote:

  sam created SPARK-1867:
  --
 
   Summary: Spark Documentation Error causes
  java.lang.IllegalStateException: unread block data
   Key: SPARK-1867
   URL: https://issues.apache.org/jira/browse/SPARK-1867
   Project: Spark
Issue Type: Bug
  Reporter: sam
 
 
  I've employed two System Administrators on a contract basis (for quite a
  bit of money), and both contractors have independently hit the following
  exception.  What we are doing is:
 
  1. Installing Spark 0.9.1 according to the documentation on the website,
  along with CDH4 (and another cluster with CDH5) distros of hadoop/hdfs.
  2. Building a fat jar with a Spark app with sbt then trying to run it on
  the cluster
 
  I've also included code snippets, and sbt deps at the bottom.
 
  When I've Googled this, there seems to be two somewhat vague responses:
  a) Mismatching spark versions on nodes/user code
  b) Need to add more jars to the SparkConf
 
  Now I know that (b) is not the problem having successfully run the same
  code on other clusters while only including one jar (it's a fat jar).
 
  But I have no idea how to check for (a) - it appears Spark doesn't have
  any version checks or anything - it would be nice if it checked versions
  and threw a mismatching version exception: you have user code using
  version X and node Y has version Z.
 
  I would be very grateful for advice on this.
 
  The exception:
 
  Exception in thread main org.apache.spark.SparkException: Job aborted:
  Task 0.0:1 failed 32 times (most recent failure: Exception failure:
  java.lang.IllegalStateException: unread block data)
  at
 
 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020)
  at
 
 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018)
  at
 
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at
  scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
  at org.apache.spark.scheduler.DAGScheduler.org
  $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018)
  at
 
 org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
  at
 
 org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
  at scala.Option.foreach(Option.scala:236)
  at
 
 org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604)
  at
 
 org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190)
  at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
  at akka.actor.ActorCell.invoke(ActorCell.scala:456)
  at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
  at akka.dispatch.Mailbox.run(Mailbox.scala:219)
  at
 
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386

[jira] [Commented] (SPARK-704) ConnectionManager sometimes cannot detect loss of sending connections

2014-06-21 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14039742#comment-14039742
 ] 

Mridul Muralidharan commented on SPARK-704:
---

If remote node goes down, SendingConnection would be notified since it is also 
registered for read events (to handle precisely this case actually).
ReceivingConnection would anyway be notified since it is waiting on reads on 
that socket.

This, ofcourse, assumes that local node detects remote node failure at tcp 
layer.
Problems come in when 

 ConnectionManager sometimes cannot detect loss of sending connections
 -

 Key: SPARK-704
 URL: https://issues.apache.org/jira/browse/SPARK-704
 Project: Spark
  Issue Type: Bug
Reporter: Charles Reiss
Assignee: Henry Saputra

 ConnectionManager currently does not detect when SendingConnections 
 disconnect except if it is trying to send through them. As a result, a node 
 failure just after a connection is initiated but before any acknowledgement 
 messages can be sent may result in a hang.
 ConnectionManager has code intended to detect this case by detecting the 
 failure of a corresponding ReceivingConnection, but this code assumes that 
 the remote host:port of the ReceivingConnection is the same as the 
 ConnectionManagerId, which is almost never true. Additionally, there does not 
 appear to be any reason to assume a corresponding ReceivingConnection will 
 exist.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Comment Edited] (SPARK-704) ConnectionManager sometimes cannot detect loss of sending connections

2014-06-21 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14039742#comment-14039742
 ] 

Mridul Muralidharan edited comment on SPARK-704 at 6/21/14 9:10 AM:


If remote node goes down, SendingConnection would be notified since it is also 
registered for read events (to handle precisely this case actually).
ReceivingConnection would be notified since it is waiting on reads on that 
socket.

This, ofcourse, assumes that local node detects remote node failure at tcp 
layer.
Problems come in when this is not detected due to no activity on the socket (at 
app and socket level - keepalive timeout, etc).
Usually this is detected via application level ping/keepalive messages :  not 
sure if we want to introduce that into spark ...


was (Author: mridulm80):
If remote node goes down, SendingConnection would be notified since it is also 
registered for read events (to handle precisely this case actually).
ReceivingConnection would anyway be notified since it is waiting on reads on 
that socket.

This, ofcourse, assumes that local node detects remote node failure at tcp 
layer.
Problems come in when 

 ConnectionManager sometimes cannot detect loss of sending connections
 -

 Key: SPARK-704
 URL: https://issues.apache.org/jira/browse/SPARK-704
 Project: Spark
  Issue Type: Bug
Reporter: Charles Reiss
Assignee: Henry Saputra

 ConnectionManager currently does not detect when SendingConnections 
 disconnect except if it is trying to send through them. As a result, a node 
 failure just after a connection is initiated but before any acknowledgement 
 messages can be sent may result in a hang.
 ConnectionManager has code intended to detect this case by detecting the 
 failure of a corresponding ReceivingConnection, but this code assumes that 
 the remote host:port of the ReceivingConnection is the same as the 
 ConnectionManagerId, which is almost never true. Additionally, there does not 
 appear to be any reason to assume a corresponding ReceivingConnection will 
 exist.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-2223) Building and running tests with maven is extremely slow

2014-06-20 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14039217#comment-14039217
 ] 

Mridul Muralidharan commented on SPARK-2223:


[~tgraves] You could try running zinc - speeds up the maven build quite a bit.
I find that I need to shutdown and restart it at times though .. but otherwise, 
works fine.

 Building and running tests with maven is extremely slow
 ---

 Key: SPARK-2223
 URL: https://issues.apache.org/jira/browse/SPARK-2223
 Project: Spark
  Issue Type: Bug
  Components: Build
Affects Versions: 1.0.0
Reporter: Thomas Graves

 For some reason using maven with Spark is extremely slow.  Building and 
 running tests takes way longer then other projects I have used that use 
 maven.  We should investigate to see why.  



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-2089) With YARN, preferredNodeLocalityData isn't honored

2014-06-20 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14039236#comment-14039236
 ] 

Mridul Muralidharan commented on SPARK-2089:


[~pwendell] SplitInfo is not from hadoop - but gives locality preference in 
context spark (see org.apache.spark.scheduler.SplitInfo) in a reasonably api 
agnostic way.
The default support provided for it is hadoop specific based on dfs blocks - 
but I dont think there is anything stopping us from expressing other forms 
(either already currently or with minor modifications as applicable).

We actually very heavily use that api - moving 10s or 100s of TB of data tends 
to be fairly expensive :-)
Since we are still stuck in 0.9 + changes, have not yet faced this issue 
though, so great to see this being addressed.



 With YARN, preferredNodeLocalityData isn't honored 
 ---

 Key: SPARK-2089
 URL: https://issues.apache.org/jira/browse/SPARK-2089
 Project: Spark
  Issue Type: Bug
  Components: YARN
Affects Versions: 1.0.0
Reporter: Sandy Ryza
Assignee: Sandy Ryza
Priority: Critical

 When running in YARN cluster mode, apps can pass preferred locality data when 
 constructing a Spark context that will dictate where to request executor 
 containers.
 This is currently broken because of a race condition.  The Spark-YARN code 
 runs the user class and waits for it to start up a SparkContext.  During its 
 initialization, the SparkContext will create a YarnClusterScheduler, which 
 notifies a monitor in the Spark-YARN code that .  The Spark-Yarn code then 
 immediately fetches the preferredNodeLocationData from the SparkContext and 
 uses it to start requesting containers.
 But in the SparkContext constructor that takes the preferredNodeLocationData, 
 setting preferredNodeLocationData comes after the rest of the initialization, 
 so, if the Spark-YARN code comes around quickly enough after being notified, 
 the data that's fetched is the empty unset version.  The occurred during all 
 of my runs.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


Re: Java IO Stream Corrupted - Invalid Type AC?

2014-06-18 Thread Mridul Muralidharan
On Wed, Jun 18, 2014 at 6:19 PM, Surendranauth Hiraman
suren.hira...@velos.io wrote:
 Patrick,

 My team is using shuffle consolidation but not speculation. We are also
 using persist(DISK_ONLY) for caching.


Use of shuffle consolidation is probably what is causing the issue.
Would be good idea to try again with that turned off (which is the default).

It should get fixed most likely in 1.1 timeframe.


Regards,
Mridul



 Here are some config changes that are in our work-in-progress.

 We've been trying for 2 weeks to get our production flow (maybe around
 50-70 stages, a few forks and joins with up to 20 branches in the forks) to
 run end to end without any success, running into other problems besides
 this one as well. For example, we have run into situations where saving to
 HDFS just hangs on a couple of tasks, which are printing out nothing in
 their logs and not taking any CPU. For testing, our input data is 10 GB
 across 320 input splits and generates maybe around 200-300 GB of
 intermediate and final data.


 conf.set(spark.executor.memory, 14g) // TODO make this
 configurable

 // shuffle configs
 conf.set(spark.default.parallelism, 320) // TODO make this
 configurable
 conf.set(spark.shuffle.consolidateFiles,true)

 conf.set(spark.shuffle.file.buffer.kb, 200)
 conf.set(spark.reducer.maxMbInFlight, 96)

 conf.set(spark.rdd.compress,true

 // we ran into a problem with the default timeout of 60 seconds
 // this is also being set in the master's spark-env.sh. Not sure if
 it needs to be in both places
 conf.set(spark.worker.timeout,180)

 // akka settings
 conf.set(spark.akka.threads, 300)
 conf.set(spark.akka.timeout, 180)
 conf.set(spark.akka.frameSize, 100)
 conf.set(spark.akka.batchSize, 30)
 conf.set(spark.akka.askTimeout, 30)

 // block manager
 conf.set(spark.storage.blockManagerTimeoutIntervalMs, 18)
 conf.set(spark.blockManagerHeartBeatMs, 8)

 -Suren



 On Wed, Jun 18, 2014 at 1:42 AM, Patrick Wendell pwend...@gmail.com wrote:

 Out of curiosity - are you guys using speculation, shuffle
 consolidation, or any other non-default option? If so that would help
 narrow down what's causing this corruption.

 On Tue, Jun 17, 2014 at 10:40 AM, Surendranauth Hiraman
 suren.hira...@velos.io wrote:
  Matt/Ryan,
 
  Did you make any headway on this? My team is running into this also.
  Doesn't happen on smaller datasets. Our input set is about 10 GB but we
  generate 100s of GBs in the flow itself.
 
  -Suren
 
 
 
 
  On Fri, Jun 6, 2014 at 5:19 PM, Ryan Compton compton.r...@gmail.com
 wrote:
 
  Just ran into this today myself. I'm on branch-1.0 using a CDH3
  cluster (no modifications to Spark or its dependencies). The error
  appeared trying to run GraphX's .connectedComponents() on a ~200GB
  edge list (GraphX worked beautifully on smaller data).
 
  Here's the stacktrace (it's quite similar to yours
  https://imgur.com/7iBA4nJ ).
 
  14/06/05 20:02:28 ERROR scheduler.TaskSetManager: Task 5.599:39 failed
  4 times; aborting job
  14/06/05 20:02:28 INFO scheduler.DAGScheduler: Failed to run reduce at
  VertexRDD.scala:100
  Exception in thread main org.apache.spark.SparkException: Job
  aborted due to stage failure: Task 5.599:39 failed 4 times, most
  recent failure: Exception failure in TID 29735 on host node18:
  java.io.StreamCorruptedException: invalid type code: AC
 
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1355)
  java.io.ObjectInputStream.readObject(ObjectInputStream.java:350)
 
 
 org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
 
 
 org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:125)
 
 org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
  scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
 
 
 org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
 
 
 org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
  scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
  scala.collection.Iterator$class.foreach(Iterator.scala:727)
  scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 
 
 org.apache.spark.graphx.impl.VertexPartitionBaseOps.innerJoinKeepLeft(VertexPartitionBaseOps.scala:192)
 
 
 org.apache.spark.graphx.impl.EdgePartition.updateVertices(EdgePartition.scala:78)
 
 
 org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:75)
 
 
 org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:73)
  scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
  scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
  

[jira] [Commented] (SPARK-1353) IllegalArgumentException when writing to disk

2014-06-17 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14033625#comment-14033625
 ] 

Mridul Muralidharan commented on SPARK-1353:


This is due to limitation in spark which is being addressed in 
https://issues.apache.org/jira/browse/SPARK-1476.

 IllegalArgumentException when writing to disk
 -

 Key: SPARK-1353
 URL: https://issues.apache.org/jira/browse/SPARK-1353
 Project: Spark
  Issue Type: Bug
  Components: Block Manager
 Environment: AWS EMR 3.2.30-49.59.amzn1.x86_64 #1 SMP  x86_64 
 GNU/Linux
 Spark 1.0.0-SNAPSHOT built for Hadoop 1.0.4 built 2014-03-18
Reporter: Jim Blomo
Priority: Minor

 The Executor may fail when trying to mmap a file bigger than 
 Integer.MAX_VALUE due to the constraints of FileChannel.map 
 (http://docs.oracle.com/javase/7/docs/api/java/nio/channels/FileChannel.html#map(java.nio.channels.FileChannel.MapMode,
  long, long)).  The signature takes longs, but the size value must be less 
 than MAX_VALUE.  This manifests with the following backtrace:
 java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
 at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828)
 at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:98)
 at 
 org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:337)
 at 
 org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:281)
 at org.apache.spark.storage.BlockManager.get(BlockManager.scala:430)
 at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:38)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:220)
 at 
 org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-2018) Big-Endian (IBM Power7) Spark Serialization issue

2014-06-11 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14027808#comment-14027808
 ] 

Mridul Muralidharan commented on SPARK-2018:


Ah ! This is an interesting bug.
Default spark uses java serialization ... so should not be an issue : but yet 
you are facing it ! (I am assuming you have not customized serialization).
Is it possible for you to dump data written and read at both ends ? The env 
vars and jvm details ?
Actually, spark does not do anything fancy for default serialization : so a 
simple example code without spark in picture could also be tried (write to file 
on master node, and read from the file in slave node - and see if it works)

 Big-Endian (IBM Power7)  Spark Serialization issue
 --

 Key: SPARK-2018
 URL: https://issues.apache.org/jira/browse/SPARK-2018
 Project: Spark
  Issue Type: Bug
Affects Versions: 1.0.0
 Environment: hardware : IBM Power7
 OS:Linux version 2.6.32-358.el6.ppc64 
 (mockbu...@ppc-017.build.eng.bos.redhat.com) (gcc version 4.4.7 20120313 (Red 
 Hat 4.4.7-3) (GCC) ) #1 SMP Tue Jan 29 11:43:27 EST 2013
 JDK: Java(TM) SE Runtime Environment (build pxp6470sr5-20130619_01(SR5))
 IBM J9 VM (build 2.6, JRE 1.7.0 Linux ppc64-64 Compressed References 
 20130617_152572 (JIT enabled, AOT enabled)
 Hadoop:Hadoop-0.2.3-CDH5.0
 Spark:Spark-1.0.0 or Spark-0.9.1
 spark-env.sh:
 export JAVA_HOME=/opt/ibm/java-ppc64-70/
 export SPARK_MASTER_IP=9.114.34.69
 export SPARK_WORKER_MEMORY=1m
 export SPARK_CLASSPATH=/home/test1/spark-1.0.0-bin-hadoop2/lib
 export  STANDALONE_SPARK_MASTER_HOST=9.114.34.69
 #export SPARK_JAVA_OPTS=' -Xdebug 
 -Xrunjdwp:transport=dt_socket,address=9,server=y,suspend=n '
Reporter: Yanjie Gao

 We have an application run on Spark on Power7 System .
 But we meet an important issue about serialization.
 The example HdfsWordCount can meet the problem.
 ./bin/run-example  org.apache.spark.examples.streaming.HdfsWordCount 
 localdir
 We used Power7 (Big-Endian arch) and Redhat  6.4.
 Big-Endian  is the main cause since the example ran successfully in another 
 Power-based Little Endian setup.
 here is the exception stack and log:
 Spark Executor Command: /opt/ibm/java-ppc64-70//bin/java -cp 
 /home/test1/spark-1.0.0-bin-hadoop2/lib::/home/test1/src/spark-1.0.0-bin-hadoop2/conf:/home/test1/src/spark-1.0.0-bin-hadoop2/lib/spark-assembly-1.0.0-hadoop2.2.0.jar:/home/test1/src/spark-1.0.0-bin-hadoop2/lib/datanucleus-rdbms-3.2.1.jar:/home/test1/src/spark-1.0.0-bin-hadoop2/lib/datanucleus-api-jdo-3.2.1.jar:/home/test1/src/spark-1.0.0-bin-hadoop2/lib/datanucleus-core-3.2.2.jar:/home/test1/src/hadoop-2.3.0-cdh5.0.0/etc/hadoop/:/home/test1/src/hadoop-2.3.0-cdh5.0.0/etc/hadoop/
  -XX:MaxPermSize=128m  -Xdebug 
 -Xrunjdwp:transport=dt_socket,address=9,server=y,suspend=n -Xms512M 
 -Xmx512M org.apache.spark.executor.CoarseGrainedExecutorBackend 
 akka.tcp://spark@9.186.105.141:60253/user/CoarseGrainedScheduler 2 
 p7hvs7br16 4 akka.tcp://sparkWorker@p7hvs7br16:59240/user/Worker 
 app-20140604023054-
 
 14/06/04 02:31:20 WARN util.NativeCodeLoader: Unable to load native-hadoop 
 library for your platform... using builtin-java classes where applicable
 14/06/04 02:31:21 INFO spark.SecurityManager: Changing view acls to: 
 test1,yifeng
 14/06/04 02:31:21 INFO spark.SecurityManager: SecurityManager: authentication 
 disabled; ui acls disabled; users with view permissions: Set(test1, yifeng)
 14/06/04 02:31:22 INFO slf4j.Slf4jLogger: Slf4jLogger started
 14/06/04 02:31:22 INFO Remoting: Starting remoting
 14/06/04 02:31:22 INFO Remoting: Remoting started; listening on addresses 
 :[akka.tcp://sparkExecutor@p7hvs7br16:39658]
 14/06/04 02:31:22 INFO Remoting: Remoting now listens on addresses: 
 [akka.tcp://sparkExecutor@p7hvs7br16:39658]
 14/06/04 02:31:22 INFO executor.CoarseGrainedExecutorBackend: Connecting to 
 driver: akka.tcp://spark@9.186.105.141:60253/user/CoarseGrainedScheduler
 14/06/04 02:31:22 INFO worker.WorkerWatcher: Connecting to worker 
 akka.tcp://sparkWorker@p7hvs7br16:59240/user/Worker
 14/06/04 02:31:23 INFO worker.WorkerWatcher: Successfully connected to 
 akka.tcp://sparkWorker@p7hvs7br16:59240/user/Worker
 14/06/04 02:31:24 INFO executor.CoarseGrainedExecutorBackend: Successfully 
 registered with driver
 14/06/04 02:31:24 INFO spark.SecurityManager: Changing view acls to: 
 test1,yifeng
 14/06/04 02:31:24 INFO spark.SecurityManager: SecurityManager: authentication 
 disabled; ui acls disabled; users with view permissions: Set(test1, yifeng)
 14/06/04 02:31:24 INFO slf4j.Slf4jLogger: Slf4jLogger started
 14/06/04 02:31:24 INFO Remoting: Starting remoting
 14/06/04 02:31:24 INFO Remoting: Remoting started; listening on addresses 
 :[akka.tcp://spark

[jira] [Commented] (SPARK-2089) With YARN, preferredNodeLocalityData isn't honored

2014-06-10 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14026397#comment-14026397
 ] 

Mridul Muralidharan commented on SPARK-2089:


preferredNodeLocationData used to be passed as a constructor parameter - and so 
always available.
The rearrangement of SparkContext initialization has introduced this bug.


 With YARN, preferredNodeLocalityData isn't honored 
 ---

 Key: SPARK-2089
 URL: https://issues.apache.org/jira/browse/SPARK-2089
 Project: Spark
  Issue Type: Bug
  Components: YARN
Affects Versions: 1.0.0
Reporter: Sandy Ryza

 When running in YARN cluster mode, apps can pass preferred locality data when 
 constructing a Spark context that will dictate where to request executor 
 containers.
 This is currently broken because of a race condition.  The Spark-YARN code 
 runs the user class and waits for it to start up a SparkContext.  During its 
 initialization, the SparkContext will create a YarnClusterScheduler, which 
 notifies a monitor in the Spark-YARN code that .  The Spark-Yarn code then 
 immediately fetches the preferredNodeLocationData from the SparkContext and 
 uses it to start requesting containers.
 But in the SparkContext constructor that takes the preferredNodeLocationData, 
 setting preferredNodeLocationData comes after the rest of the initialization, 
 so, if the Spark-YARN code comes around quickly enough after being notified, 
 the data that's fetched is the empty unset version.  The occurred during all 
 of my runs.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-2064) web ui should not remove executors if they are dead

2014-06-07 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2064?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14020789#comment-14020789
 ] 

Mridul Muralidharan commented on SPARK-2064:


Depending on how long a job runs, this can cause OOM on the master.
In yarn (and mesos ?) an executor on the same node gets different port if 
relaunched on failure - and so end up as different executor in the list.

 web ui should not remove executors if they are dead
 ---

 Key: SPARK-2064
 URL: https://issues.apache.org/jira/browse/SPARK-2064
 Project: Spark
  Issue Type: Sub-task
Reporter: Reynold Xin

 We should always show the list of executors that have ever been connected, 
 and add a status column to mark them as dead if they have been disconnected.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-2064) web ui should not remove executors if they are dead

2014-06-07 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2064?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14020936#comment-14020936
 ] 

Mridul Muralidharan commented on SPARK-2064:


It is 100 MB (or more) of memory which could be used elsewhere.
In our clusters, for example, the number of workers can be very high while the 
containers can be quite ephemeral when under load (and so lot of container 
losses); on other hand, memory per container is constrained to about 8 gig 
(lower when we account for overheads, etc).

So the amount of working memory in master reduces : we are finding that UI and 
related codepath is one of the portions which seems to be occupying a lot of 
memory in the OOM dumps of master.

 web ui should not remove executors if they are dead
 ---

 Key: SPARK-2064
 URL: https://issues.apache.org/jira/browse/SPARK-2064
 Project: Spark
  Issue Type: Sub-task
Reporter: Reynold Xin

 We should always show the list of executors that have ever been connected, 
 and add a status column to mark them as dead if they have been disconnected.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-2064) web ui should not remove executors if they are dead

2014-06-07 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2064?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14021008#comment-14021008
 ] 

Mridul Muralidharan commented on SPARK-2064:


Unfortunately OOM is a very big issue for us since application master is single 
point of failure when running in yarn.
Particularly when memory is constrained and vigorously enforced by the yarn 
containers (requiring higher overheads to be specified reducing usable memory 
even further.

Given this, and given the fair churn already for executor containers, I am 
hesitant about features which add to the memory footprint for UI even further. 
The cumulative impact of ui is nontrivial as I mentioned before. This, for 
example, would require 1-8% of master memory when there is reasonable churn for 
long running jobs (30 hours) on reasonable number of executors (200-300).


 web ui should not remove executors if they are dead
 ---

 Key: SPARK-2064
 URL: https://issues.apache.org/jira/browse/SPARK-2064
 Project: Spark
  Issue Type: Sub-task
Reporter: Reynold Xin

 We should always show the list of executors that have ever been connected, 
 and add a status column to mark them as dead if they have been disconnected.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-2064) web ui should not remove executors if they are dead

2014-06-07 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2064?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14021011#comment-14021011
 ] 

Mridul Muralidharan commented on SPARK-2064:


I am probably missing the intent behind this change.
What is the expected use case it is supposed to help with ?

 web ui should not remove executors if they are dead
 ---

 Key: SPARK-2064
 URL: https://issues.apache.org/jira/browse/SPARK-2064
 Project: Spark
  Issue Type: Sub-task
Reporter: Reynold Xin

 We should always show the list of executors that have ever been connected, 
 and add a status column to mark them as dead if they have been disconnected.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-2017) web ui stage page becomes unresponsive when the number of tasks is large

2014-06-05 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14019394#comment-14019394
 ] 

Mridul Muralidharan commented on SPARK-2017:


Currently, for our jobs, I run with spark.ui.retainedStages=3 (so that there is 
some visibility into past stages) : this is to prevent OOM's in the master when 
number of tasks per stage is not low (50k for example is not very high imo)

The stage details UI becomes very sluggish to pretty much unresponsive for our 
tasks where tasks  30k ... though that might also be a browser issue 
(firefox/chrome) ?

 web ui stage page becomes unresponsive when the number of tasks is large
 

 Key: SPARK-2017
 URL: https://issues.apache.org/jira/browse/SPARK-2017
 Project: Spark
  Issue Type: Sub-task
Reporter: Reynold Xin
  Labels: starter

 {code}
 sc.parallelize(1 to 100, 100).count()
 {code}
 The above code creates one million tasks to be executed. The stage detail web 
 ui page takes forever to load (if it ever completes).
 There are again a few different alternatives:
 0. Limit the number of tasks we show.
 1. Pagination
 2. By default only show the aggregate metrics and failed tasks, and hide the 
 successful ones.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-1956) Enable shuffle consolidation by default

2014-05-28 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14011741#comment-14011741
 ] 

Mridul Muralidharan commented on SPARK-1956:


shuffle consolidation MUST NOT be enabled - whether by default, or 
intentionally.
In 1.0, it is very badly broken - we have a whole litany of fixes for it, 
before it was reasonably stable.

Current plan is to contribute most of these back in 1.1 timeframe.

 Enable shuffle consolidation by default
 ---

 Key: SPARK-1956
 URL: https://issues.apache.org/jira/browse/SPARK-1956
 Project: Spark
  Issue Type: Improvement
  Components: Shuffle, Spark Core
Affects Versions: 1.0.0
Reporter: Sandy Ryza

 The only drawbacks are on ext3, and most everyone has ext4 at this point.  I 
 think it's better to aim the default at the common case.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-1855) Provide memory-and-local-disk RDD checkpointing

2014-05-18 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14001377#comment-14001377
 ] 

Mridul Muralidharan commented on SPARK-1855:


Did not realize that mail replies to JIRA mails did not get mirrored to JIRA ! 
Replicating my mail here :

– cut and paste –

We don't have 3x replication in spark :-)
And if we use replicated storagelevel, while decreasing odds of failure, it 
does not eliminate it (since we are not doing a great job with replication 
anyway from fault tolerance point of view).
Also it does take a nontrivial performance hit with replicated levels.

Regards,
Mridul

 Provide memory-and-local-disk RDD checkpointing
 ---

 Key: SPARK-1855
 URL: https://issues.apache.org/jira/browse/SPARK-1855
 Project: Spark
  Issue Type: New Feature
  Components: MLlib, Spark Core
Affects Versions: 1.0.0
Reporter: Xiangrui Meng

 Checkpointing is used to cut long lineage while maintaining fault tolerance. 
 The current implementation is HDFS-based. Using the BlockRDD we can create 
 in-memory-and-local-disk (with replication) checkpoints that are not as 
 reliable as HDFS-based solution but faster.
 It can help applications that require many iterations.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


Re: [VOTE] Release Apache Spark 1.0.0 (rc5)

2014-05-18 Thread Mridul Muralidharan
On 18-May-2014 5:05 am, Mark Hamstra m...@clearstorydata.com wrote:

 I don't understand.  We never said that interfaces wouldn't change from
0.9

Agreed.

 to 1.0.  What we are committing to is stability going forward from the
 1.0.0 baseline.  Nobody is disputing that backward-incompatible behavior
or
 interface changes would be an issue post-1.0.0.  The question is whether

The point is, how confident are we that these are the right set of
interface definitions.
We think it is, but we could also have gone through a 0.10 to vet the
proposed 1.0 changes to stabilize them.

To give examples for which we don't have solutions currently (which we are
facing internally here btw, so not academic exercise) :

- Current spark shuffle model breaks very badly as number of partitions
increases (input and output).

- As number of nodes increase, the overhead per node keeps going up. Spark
currently is more geared towards large memory machines; when the RAM per
node is modest (8 to 16 gig) but large number of them are available, it
does not do too well.

- Current block abstraction breaks as data per block goes beyond 2 gig.

- Cogroup/join when value per key or number of keys (or both) is high
breaks currently.

- Shuffle consolidation is so badly broken it is not funny.

- Currently there is no way of effectively leveraging accelerator
cards/coprocessors/gpus from spark - to do so, I suspect we will need to
redefine OFF_HEAP.

- Effectively leveraging ssd is still an open question IMO when you have
mix of both available.

We have resolved some of these and looking at the rest. These are not
unique to our internal usage profile, I have seen most of these asked
elsewhere too.

Thankfully some of the 1.0 changes actually are geared towards helping to
alleviate some of the above (Iterable change for ex), most of the rest are
internal impl detail of spark core which helps a lot - but there are cases
where this is not so.

Unfortunately I don't know yet if the unresolved/uninvestigated issues will
require more changes or not.

Given this I am very skeptical of expecting current spark interfaces to be
sufficient for next 1 year (forget 3)

I understand this is an argument which can be made to never release 1.0 :-)
Which is why I was ok with a 1.0 instead of 0.10 release in spite of my
preference.

This is a good problem to have IMO ... People are using spark extensively
and in circumstances that we did not envision : necessitating changes even
to spark core.

But the claim that 1.0 interfaces are stable is not something I buy - they
are not, we will need to break them soon and cost of maintaining backward
compatibility will be high.

We just need to make an informed decision to live with that cost, not hand
wave it away.

Regards
Mridul

 there is anything apparent now that is expected to require such disruptive
 changes if we were to commit to the current release candidate as our
 guaranteed 1.0.0 baseline.


 On Sat, May 17, 2014 at 2:05 PM, Mridul Muralidharan mri...@gmail.com
wrote:

  I would make the case for interface stability not just api stability.
  Particularly given that we have significantly changed some of our
  interfaces, I want to ensure developers/users are not seeing red flags.
 
  Bugs and code stability can be addressed in minor releases if found, but
  behavioral change and/or interface changes would be a much more invasive
  issue for our users.
 
  Regards
  Mridul
  On 18-May-2014 2:19 am, Matei Zaharia matei.zaha...@gmail.com wrote:
 
   As others have said, the 1.0 milestone is about API stability, not
about
   saying “we’ve eliminated all bugs”. The sooner you declare 1.0, the
  sooner
   users can confidently build on Spark, knowing that the application
they
   build today will still run on Spark 1.9.9 three years from now. This
is
   something that I’ve seen done badly (and experienced the effects
thereof)
   in other big data projects, such as MapReduce and even YARN. The
result
  is
   that you annoy users, you end up with a fragmented userbase where
  everyone
   is building against a different version, and you drastically slow down
   development.
  
   With a project as fast-growing as fast-growing as Spark in particular,
   there will be new bugs discovered and reported continuously,
especially
  in
   the non-core components. Look at the graph of # of contributors in
time
  to
   Spark: https://www.ohloh.net/p/apache-spark (bottom-most graph;
  “commits”
   changed when we started merging each patch as a single commit). This
is
  not
   slowing down, and we need to have the culture now that we treat API
   stability and release numbers at the level expected for a 1.0 project
   instead of having people come in and randomly change the API.
  
   I’ll also note that the issues marked “blocker” were marked so by
their
   reporters, since the reporter can set the priority. I don’t consider
  stuff
   like parallelize() not partitioning ranges in the same way as other
   collections

Re: [VOTE] Release Apache Spark 1.0.0 (rc5)

2014-05-18 Thread Mridul Muralidharan
So I think I need to clarify a few things here - particularly since
this mail went to the wrong mailing list and a much wider audience
than I intended it for :-)


Most of the issues I mentioned are internal implementation detail of
spark core : which means, we can enhance them in future without
disruption to our userbase (ability to support large number of
input/output partitions. Note: this is of order of 100k input and
output partitions with uniform spread of keys - very rarely seen
outside of some crazy jobs).

Some of the issues I mentioned would reqiure DeveloperApi changes -
which are not user exposed : they would impact developer use of these
api's - which are mostly internally provided by spark. (Like fixing
blocks  2G would require change to Serializer api)

A smaller faction might require interface changes - note, I am
referring specifically to configuration changes (removing/deprecating
some) and possibly newer options to submit/env, etc - I dont envision
any programming api change itself.
The only api change we did was from Seq - Iterable - which is
actually to address some of the issues I mentioned (join/cogroup).

Remaining are bugs which need to be addressed or the feature
removed/enhanced like shuffle consolidation.

There might be semantic extension of some things like OFF_HEAP storage
level to address other computation models - but that would not have an
impact on end user - since other options would be pluggable with
default set to Tachyon so that there is no user expectation change.


So will the interface possibly change ? Sure though we will try to
keep it backwardly compatible (as we did with 1.0).
Will the api change - other than backward compatible enhancements, probably not.


Regards,
Mridul


On Sun, May 18, 2014 at 12:11 PM, Mridul Muralidharan mri...@gmail.com wrote:

 On 18-May-2014 5:05 am, Mark Hamstra m...@clearstorydata.com wrote:

 I don't understand.  We never said that interfaces wouldn't change from
 0.9

 Agreed.

 to 1.0.  What we are committing to is stability going forward from the
 1.0.0 baseline.  Nobody is disputing that backward-incompatible behavior
 or
 interface changes would be an issue post-1.0.0.  The question is whether

 The point is, how confident are we that these are the right set of interface
 definitions.
 We think it is, but we could also have gone through a 0.10 to vet the
 proposed 1.0 changes to stabilize them.

 To give examples for which we don't have solutions currently (which we are
 facing internally here btw, so not academic exercise) :

 - Current spark shuffle model breaks very badly as number of partitions
 increases (input and output).

 - As number of nodes increase, the overhead per node keeps going up. Spark
 currently is more geared towards large memory machines; when the RAM per
 node is modest (8 to 16 gig) but large number of them are available, it does
 not do too well.

 - Current block abstraction breaks as data per block goes beyond 2 gig.

 - Cogroup/join when value per key or number of keys (or both) is high breaks
 currently.

 - Shuffle consolidation is so badly broken it is not funny.

 - Currently there is no way of effectively leveraging accelerator
 cards/coprocessors/gpus from spark - to do so, I suspect we will need to
 redefine OFF_HEAP.

 - Effectively leveraging ssd is still an open question IMO when you have mix
 of both available.

 We have resolved some of these and looking at the rest. These are not unique
 to our internal usage profile, I have seen most of these asked elsewhere
 too.

 Thankfully some of the 1.0 changes actually are geared towards helping to
 alleviate some of the above (Iterable change for ex), most of the rest are
 internal impl detail of spark core which helps a lot - but there are cases
 where this is not so.

 Unfortunately I don't know yet if the unresolved/uninvestigated issues will
 require more changes or not.

 Given this I am very skeptical of expecting current spark interfaces to be
 sufficient for next 1 year (forget 3)

 I understand this is an argument which can be made to never release 1.0 :-)
 Which is why I was ok with a 1.0 instead of 0.10 release in spite of my
 preference.

 This is a good problem to have IMO ... People are using spark extensively
 and in circumstances that we did not envision : necessitating changes even
 to spark core.

 But the claim that 1.0 interfaces are stable is not something I buy - they
 are not, we will need to break them soon and cost of maintaining backward
 compatibility will be high.

 We just need to make an informed decision to live with that cost, not hand
 wave it away.

 Regards
 Mridul

 there is anything apparent now that is expected to require such disruptive
 changes if we were to commit to the current release candidate as our
 guaranteed 1.0.0 baseline.


 On Sat, May 17, 2014 at 2:05 PM, Mridul Muralidharan
 mri...@gmail.comwrote:

  I would make the case for interface stability not just api stability.
  Particularly given

Re: [jira] [Created] (SPARK-1855) Provide memory-and-local-disk RDD checkpointing

2014-05-18 Thread Mridul Muralidharan
My bad ... I was replying via mobile, and I did not realize responses
to JIRA mails were not mirrored to JIRA - unlike PR responses !


Regards,
Mridul

On Sun, May 18, 2014 at 2:50 AM, Matei Zaharia matei.zaha...@gmail.com wrote:
 We do actually have replicated StorageLevels in Spark. You can use 
 MEMORY_AND_DISK_2 or construct your own StorageLevel with your own custom 
 replication factor.

 BTW you guys should probably have this discussion on the JIRA rather than the 
 dev list; I think the replies somehow ended up on the dev list.

 Matei

 On May 17, 2014, at 1:36 AM, Mridul Muralidharan mri...@gmail.com wrote:

 We don't have 3x replication in spark :-)
 And if we use replicated storagelevel, while decreasing odds of failure, it
 does not eliminate it (since we are not doing a great job with replication
 anyway from fault tolerance point of view).
 Also it does take a nontrivial performance hit with replicated levels.

 Regards,
 Mridul
 On 17-May-2014 8:16 am, Xiangrui Meng men...@gmail.com wrote:

 With 3x replication, we should be able to achieve fault tolerance.
 This checkPointed RDD can be cleared if we have another in-memory
 checkPointed RDD down the line. It can avoid hitting disk if we have
 enough memory to use. We need to investigate more to find a good
 solution. -Xiangrui

 On Fri, May 16, 2014 at 4:00 PM, Mridul Muralidharan mri...@gmail.com
 wrote:
 Effectively this is persist without fault tolerance.
 Failure of any node means complete lack of fault tolerance.
 I would be very skeptical of truncating lineage if it is not reliable.
 On 17-May-2014 3:49 am, Xiangrui Meng (JIRA) j...@apache.org wrote:

 Xiangrui Meng created SPARK-1855:
 

 Summary: Provide memory-and-local-disk RDD checkpointing
 Key: SPARK-1855
 URL: https://issues.apache.org/jira/browse/SPARK-1855
 Project: Spark
  Issue Type: New Feature
  Components: MLlib, Spark Core
Affects Versions: 1.0.0
Reporter: Xiangrui Meng


 Checkpointing is used to cut long lineage while maintaining fault
 tolerance. The current implementation is HDFS-based. Using the BlockRDD
 we
 can create in-memory-and-local-disk (with replication) checkpoints that
 are
 not as reliable as HDFS-based solution but faster.

 It can help applications that require many iterations.



 --
 This message was sent by Atlassian JIRA
 (v6.2#6252)





Re: [VOTE] Release Apache Spark 1.0.0 (rc5)

2014-05-17 Thread Mridul Muralidharan
I had echoed similar sentiments a while back when there was a discussion
around 0.10 vs 1.0 ... I would have preferred 0.10 to stabilize the api
changes, add missing functionality, go through a hardening release before
1.0

But the community preferred a 1.0 :-)

Regards,
Mridul

On 17-May-2014 3:19 pm, Sean Owen so...@cloudera.com wrote:

 On this note, non-binding commentary:

 Releases happen in local minima of change, usually created by
 internally enforced code freeze. Spark is incredibly busy now due to
 external factors -- recently a TLP, recently discovered by a large new
 audience, ease of contribution enabled by Github. It's getting like
 the first year of mainstream battle-testing in a month. It's been very
 hard to freeze anything! I see a number of non-trivial issues being
 reported, and I don't think it has been possible to triage all of
 them, even.

 Given the high rate of change, my instinct would have been to release
 0.10.0 now. But won't it always be very busy? I do think the rate of
 significant issues will slow down.

 Version ain't nothing but a number, but if it has any meaning it's the
 semantic versioning meaning. 1.0 imposes extra handicaps around
 striving to maintain backwards-compatibility. That may end up being
 bent to fit in important changes that are going to be required in this
 continuing period of change. Hadoop does this all the time
 unfortunately and gets away with it, I suppose -- minor version
 releases are really major. (On the other extreme, HBase is at 0.98 and
 quite production-ready.)

 Just consider this a second vote for focus on fixes and 1.0.x rather
 than new features and 1.x. I think there are a few steps that could
 streamline triage of this flood of contributions, and make all of this
 easier, but that's for another thread.


 On Fri, May 16, 2014 at 8:50 PM, Mark Hamstra m...@clearstorydata.com
wrote:
  +1, but just barely.  We've got quite a number of outstanding bugs
  identified, and many of them have fixes in progress.  I'd hate to see
those
  efforts get lost in a post-1.0.0 flood of new features targeted at
1.1.0 --
  in other words, I'd like to see 1.0.1 retain a high priority relative to
  1.1.0.
 
  Looking through the unresolved JIRAs, it doesn't look like any of the
  identified bugs are show-stoppers or strictly regressions (although I
will
  note that one that I have in progress, SPARK-1749, is a bug that we
  introduced with recent work -- it's not strictly a regression because we
  had equally bad but different behavior when the DAGScheduler exceptions
  weren't previously being handled at all vs. being slightly mis-handled
  now), so I'm not currently seeing a reason not to release.


Re: [jira] [Created] (SPARK-1867) Spark Documentation Error causes java.lang.IllegalStateException: unread block data

2014-05-17 Thread Mridul Muralidharan
I suspect this is an issue we have fixed internally here as part of a
larger change - the issue we fixed was not a config issue but bugs in spark.

Unfortunately we plan to contribute this as part of 1.1

Regards,
Mridul
On 17-May-2014 4:09 pm, sam (JIRA) j...@apache.org wrote:

 sam created SPARK-1867:
 --

  Summary: Spark Documentation Error causes
 java.lang.IllegalStateException: unread block data
  Key: SPARK-1867
  URL: https://issues.apache.org/jira/browse/SPARK-1867
  Project: Spark
   Issue Type: Bug
 Reporter: sam


 I've employed two System Administrators on a contract basis (for quite a
 bit of money), and both contractors have independently hit the following
 exception.  What we are doing is:

 1. Installing Spark 0.9.1 according to the documentation on the website,
 along with CDH4 (and another cluster with CDH5) distros of hadoop/hdfs.
 2. Building a fat jar with a Spark app with sbt then trying to run it on
 the cluster

 I've also included code snippets, and sbt deps at the bottom.

 When I've Googled this, there seems to be two somewhat vague responses:
 a) Mismatching spark versions on nodes/user code
 b) Need to add more jars to the SparkConf

 Now I know that (b) is not the problem having successfully run the same
 code on other clusters while only including one jar (it's a fat jar).

 But I have no idea how to check for (a) - it appears Spark doesn't have
 any version checks or anything - it would be nice if it checked versions
 and threw a mismatching version exception: you have user code using
 version X and node Y has version Z.

 I would be very grateful for advice on this.

 The exception:

 Exception in thread main org.apache.spark.SparkException: Job aborted:
 Task 0.0:1 failed 32 times (most recent failure: Exception failure:
 java.lang.IllegalStateException: unread block data)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018)
 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at
 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
 at scala.Option.foreach(Option.scala:236)
 at
 org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
 at akka.actor.ActorCell.invoke(ActorCell.scala:456)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
 at akka.dispatch.Mailbox.run(Mailbox.scala:219)
 at
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
 at
 scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 14/05/16 18:05:31 INFO scheduler.TaskSetManager: Loss was due to
 java.lang.IllegalStateException: unread block data [duplicate 59]

 My code snippet:

 val conf = new SparkConf()
.setMaster(clusterMaster)
.setAppName(appName)
.setSparkHome(sparkHome)
.setJars(SparkContext.jarOfClass(this.getClass))

 println(count =  + new SparkContext(conf).textFile(someHdfsPath).count())

 My SBT dependencies:

 // relevant
 org.apache.spark % spark-core_2.10 % 0.9.1,
 org.apache.hadoop % hadoop-client % 2.3.0-mr1-cdh5.0.0,

 // standard, probably unrelated
 com.github.seratch %% awscala % [0.2,),
 org.scalacheck %% scalacheck % 1.10.1 % test,
 org.specs2 %% specs2 % 1.14 % test,
 org.scala-lang % scala-reflect % 2.10.3,
 org.scalaz %% scalaz-core % 7.0.5,
 net.minidev % json-smart % 1.2



 --
 This message was sent by Atlassian JIRA
 (v6.2#6252)



Re: [VOTE] Release Apache Spark 1.0.0 (rc5)

2014-05-17 Thread Mridul Muralidharan
We made incompatible api changes whose impact we don't know yet completely
: both from implementation and usage point of view.

We had the option of getting real-world feedback from the user community if
we had gone to 0.10 but the spark developers seemed to be in a hurry to get
to 1.0 - so I made my opinion known but left it to the wisdom of larger
group of committers to decide ... I did not think it was critical enough to
do a binding -1 on.

Regards
Mridul
On 17-May-2014 9:43 pm, Mark Hamstra m...@clearstorydata.com wrote:

 Which of the unresolved bugs in spark-core do you think will require an
 API-breaking change to fix?  If there are none of those, then we are still
 essentially on track for a 1.0.0 release.

 The number of contributions and pace of change now is quite high, but I
 don't think that waiting for the pace to slow before releasing 1.0 is
 viable.  If Spark's short history is any guide to its near future, the pace
 will not slow by any significant amount for any noteworthy length of time,
 but rather will continue to increase.  What we need to be aiming for, I
 think, is to have the great majority of those new contributions being made
 to MLLlib, GraphX, SparkSQL and other areas of the code that we have
 clearly marked as not frozen in 1.x. I think we are already seeing that,
 but if I am just not recognizing breakage of our semantic versioning
 guarantee that will be forced on us by some pending changes, now would be a
 good time to set me straight.


 On Sat, May 17, 2014 at 4:26 AM, Mridul Muralidharan mri...@gmail.com
 wrote:

  I had echoed similar sentiments a while back when there was a discussion
  around 0.10 vs 1.0 ... I would have preferred 0.10 to stabilize the api
  changes, add missing functionality, go through a hardening release before
  1.0
 
  But the community preferred a 1.0 :-)
 
  Regards,
  Mridul
 
  On 17-May-2014 3:19 pm, Sean Owen so...@cloudera.com wrote:
  
   On this note, non-binding commentary:
  
   Releases happen in local minima of change, usually created by
   internally enforced code freeze. Spark is incredibly busy now due to
   external factors -- recently a TLP, recently discovered by a large new
   audience, ease of contribution enabled by Github. It's getting like
   the first year of mainstream battle-testing in a month. It's been very
   hard to freeze anything! I see a number of non-trivial issues being
   reported, and I don't think it has been possible to triage all of
   them, even.
  
   Given the high rate of change, my instinct would have been to release
   0.10.0 now. But won't it always be very busy? I do think the rate of
   significant issues will slow down.
  
   Version ain't nothing but a number, but if it has any meaning it's the
   semantic versioning meaning. 1.0 imposes extra handicaps around
   striving to maintain backwards-compatibility. That may end up being
   bent to fit in important changes that are going to be required in this
   continuing period of change. Hadoop does this all the time
   unfortunately and gets away with it, I suppose -- minor version
   releases are really major. (On the other extreme, HBase is at 0.98 and
   quite production-ready.)
  
   Just consider this a second vote for focus on fixes and 1.0.x rather
   than new features and 1.x. I think there are a few steps that could
   streamline triage of this flood of contributions, and make all of this
   easier, but that's for another thread.
  
  
   On Fri, May 16, 2014 at 8:50 PM, Mark Hamstra m...@clearstorydata.com
 
  wrote:
+1, but just barely.  We've got quite a number of outstanding bugs
identified, and many of them have fixes in progress.  I'd hate to see
  those
efforts get lost in a post-1.0.0 flood of new features targeted at
  1.1.0 --
in other words, I'd like to see 1.0.1 retain a high priority relative
  to
1.1.0.
   
Looking through the unresolved JIRAs, it doesn't look like any of the
identified bugs are show-stoppers or strictly regressions (although I
  will
note that one that I have in progress, SPARK-1749, is a bug that we
introduced with recent work -- it's not strictly a regression because
  we
had equally bad but different behavior when the DAGScheduler
 exceptions
weren't previously being handled at all vs. being slightly
 mis-handled
now), so I'm not currently seeing a reason not to release.
 



Re: [VOTE] Release Apache Spark 1.0.0 (rc5)

2014-05-17 Thread Mridul Muralidharan
On 17-May-2014 11:40 pm, Mark Hamstra m...@clearstorydata.com wrote:

 That is a past issue that we don't need to be re-opening now.  The present

Huh ? If we need to revisit based on changed circumstances, we must - the
scope of changes introduced in this release was definitely not anticipated
when 1.0 vs 0.10 discussion happened.

If folks are worried about stability of core; it is a valid concern IMO.

Having said that, I am still ok with going to 1.0; but if a conversation
starts about need for 1.0 vs going to 0.10 I want to hear more and possibly
allay the concerns and not try to muzzle the discussion.


Regards
Mridul

 issue, and what I am asking, is which pending bug fixes does anyone
 anticipate will require breaking the public API guaranteed in rc9


 On Sat, May 17, 2014 at 9:44 AM, Mridul Muralidharan mri...@gmail.com
wrote:

  We made incompatible api changes whose impact we don't know yet
completely
  : both from implementation and usage point of view.
 
  We had the option of getting real-world feedback from the user
community if
  we had gone to 0.10 but the spark developers seemed to be in a hurry to
get
  to 1.0 - so I made my opinion known but left it to the wisdom of larger
  group of committers to decide ... I did not think it was critical
enough to
  do a binding -1 on.
 
  Regards
  Mridul
  On 17-May-2014 9:43 pm, Mark Hamstra m...@clearstorydata.com wrote:
 
   Which of the unresolved bugs in spark-core do you think will require
an
   API-breaking change to fix?  If there are none of those, then we are
  still
   essentially on track for a 1.0.0 release.
  
   The number of contributions and pace of change now is quite high, but
I
   don't think that waiting for the pace to slow before releasing 1.0 is
   viable.  If Spark's short history is any guide to its near future, the
  pace
   will not slow by any significant amount for any noteworthy length of
  time,
   but rather will continue to increase.  What we need to be aiming for,
I
   think, is to have the great majority of those new contributions being
  made
   to MLLlib, GraphX, SparkSQL and other areas of the code that we have
   clearly marked as not frozen in 1.x. I think we are already seeing
that,
   but if I am just not recognizing breakage of our semantic versioning
   guarantee that will be forced on us by some pending changes, now would
  be a
   good time to set me straight.
  
  
   On Sat, May 17, 2014 at 4:26 AM, Mridul Muralidharan mri...@gmail.com
   wrote:
  
I had echoed similar sentiments a while back when there was a
  discussion
around 0.10 vs 1.0 ... I would have preferred 0.10 to stabilize the
api
changes, add missing functionality, go through a hardening release
  before
1.0
   
But the community preferred a 1.0 :-)
   
Regards,
Mridul
   
On 17-May-2014 3:19 pm, Sean Owen so...@cloudera.com wrote:

 On this note, non-binding commentary:

 Releases happen in local minima of change, usually created by
 internally enforced code freeze. Spark is incredibly busy now due
to
 external factors -- recently a TLP, recently discovered by a large
  new
 audience, ease of contribution enabled by Github. It's getting
like
 the first year of mainstream battle-testing in a month. It's been
  very
 hard to freeze anything! I see a number of non-trivial issues
being
 reported, and I don't think it has been possible to triage all of
 them, even.

 Given the high rate of change, my instinct would have been to
release
 0.10.0 now. But won't it always be very busy? I do think the rate
of
 significant issues will slow down.

 Version ain't nothing but a number, but if it has any meaning it's
  the
 semantic versioning meaning. 1.0 imposes extra handicaps around
 striving to maintain backwards-compatibility. That may end up
being
 bent to fit in important changes that are going to be required in
  this
 continuing period of change. Hadoop does this all the time
 unfortunately and gets away with it, I suppose -- minor version
 releases are really major. (On the other extreme, HBase is at 0.98
  and
 quite production-ready.)

 Just consider this a second vote for focus on fixes and 1.0.x
rather
 than new features and 1.x. I think there are a few steps that
could
 streamline triage of this flood of contributions, and make all of
  this
 easier, but that's for another thread.


 On Fri, May 16, 2014 at 8:50 PM, Mark Hamstra 
  m...@clearstorydata.com
   
wrote:
  +1, but just barely.  We've got quite a number of outstanding
bugs
  identified, and many of them have fixes in progress.  I'd hate
to
  see
those
  efforts get lost in a post-1.0.0 flood of new features targeted
at
1.1.0 --
  in other words, I'd like to see 1.0.1 retain a high priority
  relative
to
  1.1.0.
 
  Looking through the unresolved JIRAs, it doesn't look like

Re: [VOTE] Release Apache Spark 1.0.0 (rc5)

2014-05-17 Thread Mridul Muralidharan
On 18-May-2014 1:45 am, Mark Hamstra m...@clearstorydata.com wrote:

 I'm not trying to muzzle the discussion.  All I am saying is that we don't
 need to have the same discussion about 0.10 vs. 1.0 that we already had.

Agreed, no point in repeating the same discussion ... I am also trying to
understand what the concerns are.

Specifically though, the scope of 1.0 (in terms of changes) went up quite a
bit - a lot of which are new changes and features; not just the initially
envisioned api changes and stability fixes.

If this is raising concerns, particularly since lot of users are depending
on stability of spark interfaces (api, env, scripts, behavior); I want to
understand better what they are - and if they are legitimately serious
enough, we will need to revisit decision to go to 1.0 instead of 0.10 ...
I hope we don't need to though given how late we are in dev cycle

Regards
Mridul

  If you can tell me about specific changes in the current release
candidate
 that occasion new arguments for why a 1.0 release is an unacceptable idea,
 then I'm listening.


 On Sat, May 17, 2014 at 11:59 AM, Mridul Muralidharan mri...@gmail.com
wrote:

  On 17-May-2014 11:40 pm, Mark Hamstra m...@clearstorydata.com wrote:
  
   That is a past issue that we don't need to be re-opening now.  The
  present
 
  Huh ? If we need to revisit based on changed circumstances, we must -
the
  scope of changes introduced in this release was definitely not
anticipated
  when 1.0 vs 0.10 discussion happened.
 
  If folks are worried about stability of core; it is a valid concern IMO.
 
  Having said that, I am still ok with going to 1.0; but if a conversation
  starts about need for 1.0 vs going to 0.10 I want to hear more and
possibly
  allay the concerns and not try to muzzle the discussion.
 
 
  Regards
  Mridul
 
   issue, and what I am asking, is which pending bug fixes does anyone
   anticipate will require breaking the public API guaranteed in rc9
  
  
   On Sat, May 17, 2014 at 9:44 AM, Mridul Muralidharan mri...@gmail.com
  wrote:
  
We made incompatible api changes whose impact we don't know yet
  completely
: both from implementation and usage point of view.
   
We had the option of getting real-world feedback from the user
  community if
we had gone to 0.10 but the spark developers seemed to be in a
hurry to
  get
to 1.0 - so I made my opinion known but left it to the wisdom of
larger
group of committers to decide ... I did not think it was critical
  enough to
do a binding -1 on.
   
Regards
Mridul
On 17-May-2014 9:43 pm, Mark Hamstra m...@clearstorydata.com
  wrote:
   
 Which of the unresolved bugs in spark-core do you think will
require
  an
 API-breaking change to fix?  If there are none of those, then we
are
still
 essentially on track for a 1.0.0 release.

 The number of contributions and pace of change now is quite high,
but
  I
 don't think that waiting for the pace to slow before releasing
1.0 is
 viable.  If Spark's short history is any guide to its near future,
  the
pace
 will not slow by any significant amount for any noteworthy length
of
time,
 but rather will continue to increase.  What we need to be aiming
for,
  I
 think, is to have the great majority of those new contributions
being
made
 to MLLlib, GraphX, SparkSQL and other areas of the code that we
have
 clearly marked as not frozen in 1.x. I think we are already seeing
  that,
 but if I am just not recognizing breakage of our semantic
versioning
 guarantee that will be forced on us by some pending changes, now
  would
be a
 good time to set me straight.


 On Sat, May 17, 2014 at 4:26 AM, Mridul Muralidharan 
  mri...@gmail.com
 wrote:

  I had echoed similar sentiments a while back when there was a
discussion
  around 0.10 vs 1.0 ... I would have preferred 0.10 to stabilize
the
  api
  changes, add missing functionality, go through a hardening
release
before
  1.0
 
  But the community preferred a 1.0 :-)
 
  Regards,
  Mridul
 
  On 17-May-2014 3:19 pm, Sean Owen so...@cloudera.com wrote:
  
   On this note, non-binding commentary:
  
   Releases happen in local minima of change, usually created by
   internally enforced code freeze. Spark is incredibly busy now
due
  to
   external factors -- recently a TLP, recently discovered by a
  large
new
   audience, ease of contribution enabled by Github. It's getting
  like
   the first year of mainstream battle-testing in a month. It's
been
very
   hard to freeze anything! I see a number of non-trivial issues
  being
   reported, and I don't think it has been possible to triage
all of
   them, even.
  
   Given the high rate of change, my instinct would have been to
  release
   0.10.0 now. But won't it always be very busy? I do think the
rate

Re: [VOTE] Release Apache Spark 1.0.0 (rc5)

2014-05-17 Thread Mridul Muralidharan
I would make the case for interface stability not just api stability.
Particularly given that we have significantly changed some of our
interfaces, I want to ensure developers/users are not seeing red flags.

Bugs and code stability can be addressed in minor releases if found, but
behavioral change and/or interface changes would be a much more invasive
issue for our users.

Regards
Mridul
On 18-May-2014 2:19 am, Matei Zaharia matei.zaha...@gmail.com wrote:

 As others have said, the 1.0 milestone is about API stability, not about
 saying “we’ve eliminated all bugs”. The sooner you declare 1.0, the sooner
 users can confidently build on Spark, knowing that the application they
 build today will still run on Spark 1.9.9 three years from now. This is
 something that I’ve seen done badly (and experienced the effects thereof)
 in other big data projects, such as MapReduce and even YARN. The result is
 that you annoy users, you end up with a fragmented userbase where everyone
 is building against a different version, and you drastically slow down
 development.

 With a project as fast-growing as fast-growing as Spark in particular,
 there will be new bugs discovered and reported continuously, especially in
 the non-core components. Look at the graph of # of contributors in time to
 Spark: https://www.ohloh.net/p/apache-spark (bottom-most graph; “commits”
 changed when we started merging each patch as a single commit). This is not
 slowing down, and we need to have the culture now that we treat API
 stability and release numbers at the level expected for a 1.0 project
 instead of having people come in and randomly change the API.

 I’ll also note that the issues marked “blocker” were marked so by their
 reporters, since the reporter can set the priority. I don’t consider stuff
 like parallelize() not partitioning ranges in the same way as other
 collections a blocker — it’s a bug, it would be good to fix it, but it only
 affects a small number of use cases. Of course if we find a real blocker
 (in particular a regression from a previous version, or a feature that’s
 just completely broken), we will delay the release for that, but at some
 point you have to say “okay, this fix will go into the next maintenance
 release”. Maybe we need to write a clear policy for what the issue
 priorities mean.

 Finally, I believe it’s much better to have a culture where you can make
 releases on a regular schedule, and have the option to make a maintenance
 release in 3-4 days if you find new bugs, than one where you pile up stuff
 into each release. This is what much large project than us, like Linux, do,
 and it’s the only way to avoid indefinite stalling with a large contributor
 base. In the worst case, if you find a new bug that warrants immediate
 release, it goes into 1.0.1 a week after 1.0.0 (we can vote on 1.0.1 in
 three days with just your bug fix in it). And if you find an API that you’d
 like to improve, just add a new one and maybe deprecate the old one — at
 some point we have to respect our users and let them know that code they
 write today will still run tomorrow.

 Matei

 On May 17, 2014, at 10:32 AM, Kan Zhang kzh...@apache.org wrote:

  +1 on the running commentary here, non-binding of course :-)
 
 
  On Sat, May 17, 2014 at 8:44 AM, Andrew Ash and...@andrewash.com
 wrote:
 
  +1 on the next release feeling more like a 0.10 than a 1.0
  On May 17, 2014 4:38 AM, Mridul Muralidharan mri...@gmail.com
 wrote:
 
  I had echoed similar sentiments a while back when there was a
 discussion
  around 0.10 vs 1.0 ... I would have preferred 0.10 to stabilize the api
  changes, add missing functionality, go through a hardening release
 before
  1.0
 
  But the community preferred a 1.0 :-)
 
  Regards,
  Mridul
 
  On 17-May-2014 3:19 pm, Sean Owen so...@cloudera.com wrote:
 
  On this note, non-binding commentary:
 
  Releases happen in local minima of change, usually created by
  internally enforced code freeze. Spark is incredibly busy now due to
  external factors -- recently a TLP, recently discovered by a large new
  audience, ease of contribution enabled by Github. It's getting like
  the first year of mainstream battle-testing in a month. It's been very
  hard to freeze anything! I see a number of non-trivial issues being
  reported, and I don't think it has been possible to triage all of
  them, even.
 
  Given the high rate of change, my instinct would have been to release
  0.10.0 now. But won't it always be very busy? I do think the rate of
  significant issues will slow down.
 
  Version ain't nothing but a number, but if it has any meaning it's the
  semantic versioning meaning. 1.0 imposes extra handicaps around
  striving to maintain backwards-compatibility. That may end up being
  bent to fit in important changes that are going to be required in this
  continuing period of change. Hadoop does this all the time
  unfortunately and gets away with it, I suppose -- minor version
  releases are really

[jira] [Commented] (SPARK-1849) Broken UTF-8 encoded data gets character replacements and thus can't be fixed

2014-05-16 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14000397#comment-14000397
 ] 

Mridul Muralidharan commented on SPARK-1849:


Looks like textFile is probably the wrong api to use.
You cannot recover from badly encoded data ... Better would be to write your 
own InputFormat which does what you need.

 Broken UTF-8 encoded data gets character replacements and thus can't be 
 fixed
 ---

 Key: SPARK-1849
 URL: https://issues.apache.org/jira/browse/SPARK-1849
 Project: Spark
  Issue Type: Bug
Reporter: Harry Brundage
 Fix For: 1.0.0, 0.9.1

 Attachments: encoding_test


 I'm trying to process a file which isn't valid UTF-8 data inside hadoop using 
 Spark via {{sc.textFile()}}. Is this possible, and if not, is this a bug that 
 we should fix? It looks like {{HadoopRDD}} uses 
 {{org.apache.hadoop.io.Text.toString}} on all the data it ever reads, which I 
 believe replaces invalid UTF-8 byte sequences with the UTF-8 replacement 
 character, \uFFFD. Some example code mimicking what {{sc.textFile}} does 
 underneath:
 {code}
 scala sc.textFile(path).collect()(0)
 res8: String = ?pple
 scala sc.hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], 
 classOf[Text]).map(pair = pair._2.toString).collect()(0).getBytes()
 res9: Array[Byte] = Array(-17, -65, -67, 112, 112, 108, 101)
 scala sc.hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], 
 classOf[Text]).map(pair = pair._2.getBytes).collect()(0)
 res10: Array[Byte] = Array(-60, 112, 112, 108, 101)
 {code}
 In the above example, the first two snippets show the string representation 
 and byte representation of the example line of text. The third snippet shows 
 what happens if you call {{getBytes}} on the {{Text}} object which comes back 
 from hadoop land: we get the real bytes in the file out.
 Now, I think this is a bug, though you may disagree. The text inside my file 
 is perfectly valid iso-8859-1 encoded bytes, which I would like to be able to 
 rescue and re-encode into UTF-8, because I want my application to be smart 
 like that. I think Spark should give me the raw broken string so I can 
 re-encode, but I can't get at the original bytes in order to guess at what 
 the source encoding might be, as they have already been replaced. I'm dealing 
 with data from some CDN access logs which are to put it nicely diversely 
 encoded, but I think a use case Spark should fully support. So, my suggested 
 fix, which I'd like some guidance, is to change {{textFile}} to spit out 
 broken strings by not using {{Text}}'s UTF-8 encoding.
 Further compounding this issue is that my application is actually in PySpark, 
 but we can talk about how bytes fly through to Scala land after this if we 
 agree that this is an issue at all. 



--
This message was sent by Atlassian JIRA
(v6.2#6252)


Re: [jira] [Created] (SPARK-1855) Provide memory-and-local-disk RDD checkpointing

2014-05-16 Thread Mridul Muralidharan
Effectively this is persist without fault tolerance.
Failure of any node means complete lack of fault tolerance.
I would be very skeptical of truncating lineage if it is not reliable.
 On 17-May-2014 3:49 am, Xiangrui Meng (JIRA) j...@apache.org wrote:

 Xiangrui Meng created SPARK-1855:
 

  Summary: Provide memory-and-local-disk RDD checkpointing
  Key: SPARK-1855
  URL: https://issues.apache.org/jira/browse/SPARK-1855
  Project: Spark
   Issue Type: New Feature
   Components: MLlib, Spark Core
 Affects Versions: 1.0.0
 Reporter: Xiangrui Meng


 Checkpointing is used to cut long lineage while maintaining fault
 tolerance. The current implementation is HDFS-based. Using the BlockRDD we
 can create in-memory-and-local-disk (with replication) checkpoints that are
 not as reliable as HDFS-based solution but faster.

 It can help applications that require many iterations.



 --
 This message was sent by Atlassian JIRA
 (v6.2#6252)



Re: [VOTE] Release Apache Spark 1.0.0 (rc6)

2014-05-16 Thread Mridul Muralidharan
So was rc5 cancelled ? Did not see a note indicating that or why ... [1]

- Mridul


[1] could have easily missed it in the email storm though !

On Thu, May 15, 2014 at 1:32 AM, Patrick Wendell pwend...@gmail.com wrote:
 Please vote on releasing the following candidate as Apache Spark version 
 1.0.0!

 This patch has a few minor fixes on top of rc5. I've also built the
 binary artifacts with Hive support enabled so people can test this
 configuration. When we release 1.0 we might just release both vanilla
 and Hive-enabled binaries.

 The tag to be voted on is v1.0.0-rc6 (commit 54133a):
 https://git-wip-us.apache.org/repos/asf?p=spark.git;a=commit;h=54133abdce0246f6643a1112a5204afb2c4caa82

 The release files, including signatures, digests, etc. can be found at:
 http://people.apache.org/~pwendell/spark-1.0.0-rc6/

 Release artifacts are signed with the following key:
 https://people.apache.org/keys/committer/pwendell.asc

 The staging repository for this release can be found at:
 https://repository.apache.org/content/repositories/orgapachestratos-1011

 The documentation corresponding to this release can be found at:
 http://people.apache.org/~pwendell/spark-1.0.0-rc6-docs/

 Please vote on releasing this package as Apache Spark 1.0.0!

 The vote is open until Saturday, May 17, at 20:58 UTC and passes if
 amajority of at least 3 +1 PMC votes are cast.

 [ ] +1 Release this package as Apache Spark 1.0.0
 [ ] -1 Do not release this package because ...

 To learn more about Apache Spark, please see
 http://spark.apache.org/

 == API Changes ==
 We welcome users to compile Spark applications against 1.0. There are
 a few API changes in this release. Here are links to the associated
 upgrade guides - user facing changes have been kept as small as
 possible.

 changes to ML vector specification:
 http://people.apache.org/~pwendell/spark-1.0.0-rc5-docs/mllib-guide.html#from-09-to-10

 changes to the Java API:
 http://people.apache.org/~pwendell/spark-1.0.0-rc5-docs/java-programming-guide.html#upgrading-from-pre-10-versions-of-spark

 changes to the streaming API:
 http://people.apache.org/~pwendell/spark-1.0.0-rc5-docs/streaming-programming-guide.html#migration-guide-from-091-or-below-to-1x

 changes to the GraphX API:
 http://people.apache.org/~pwendell/spark-1.0.0-rc5-docs/graphx-programming-guide.html#upgrade-guide-from-spark-091

 coGroup and related functions now return Iterable[T] instead of Seq[T]
 == Call toSeq on the result to restore the old behavior

 SparkContext.jarOfClass returns Option[String] instead of Seq[String]
 == Call toSeq on the result to restore old behavior


Re: [jira] [Created] (SPARK-1767) Prefer HDFS-cached replicas when scheduling data-local tasks

2014-05-15 Thread Mridul Muralidharan
Hi Sandy,

  I assume you are referring to caching added to datanodes via new caching
api via NN ? (To preemptively mmap blocks).

I have not looked in detail, but does NN tell us about this in block
locations?
If yes, we can simply make those process local instead of node local for
executors on that node.

This would simply be a change to hadoop based rdd partitioning (what makes
it tricky is to expose currently 'alive' executors to partition)

Thanks
Mridul
On 15-May-2014 3:49 am, Sandy Ryza (JIRA) j...@apache.org wrote:

 Sandy Ryza created SPARK-1767:
 -

  Summary: Prefer HDFS-cached replicas when scheduling
 data-local tasks
  Key: SPARK-1767
  URL: https://issues.apache.org/jira/browse/SPARK-1767
  Project: Spark
   Issue Type: Improvement
   Components: Spark Core
 Affects Versions: 1.0.0
 Reporter: Sandy Ryza






 --
 This message was sent by Atlassian JIRA
 (v6.2#6252)



[jira] [Commented] (SPARK-1813) Add a utility to SparkConf that makes using Kryo really easy

2014-05-13 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13996390#comment-13996390
 ] 

Mridul Muralidharan commented on SPARK-1813:


Writing a KryoRegistrator is the only requirement - rest are done as part of 
initialization anyway.
Registering classes with kryo is non trivial except for degenerate cases : for 
example, we have classes we have to use java read/write Object serialization, 
which support kyro serialization, which support java's external serialization, 
generated classes, etc.
And we would need a registrator ... ofcourse, it could be argued this is corner 
case, though I dont think so.

 Add a utility to SparkConf that makes using Kryo really easy
 

 Key: SPARK-1813
 URL: https://issues.apache.org/jira/browse/SPARK-1813
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 1.0.0
Reporter: Sandy Ryza

 It would be nice to have a method in SparkConf that makes it really easy to 
 use Kryo and register a set of classes. without defining you
 Using Kryo currently requires all this:
 {code}
 import com.esotericsoftware.kryo.Kryo
 import org.apache.spark.serializer.KryoRegistrator
 class MyRegistrator extends KryoRegistrator {
   override def registerClasses(kryo: Kryo) {
 kryo.register(classOf[MyClass1])
 kryo.register(classOf[MyClass2])
   }
 }
 val conf = new SparkConf().setMaster(...).setAppName(...)
 conf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer)
 conf.set(spark.kryo.registrator, mypackage.MyRegistrator)
 val sc = new SparkContext(conf)
 {code}
 It would be nice if it just required this:
 {code}
 SparkConf.setKryo(Array(classOf[MyFirstClass, classOf[MySecond]))
 {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)


Re: bug using kryo as closure serializer

2014-05-04 Thread Mridul Muralidharan
On a slightly related note (apologies Soren for hijacking the thread),
Reynold how much better is kryo from spark's usage point of view
compared to the default java serialization (in general, not for
closures) ?
The numbers on kyro site are interesting, but since you have played
the most with kryo in context of spark (i think) - how do you rate it
along lines of :

1) computational overhead compared to java serialization.
2) memory overhead.
3) generated byte[] size.


Particularly given the bugs Patrick and I had looked into in past
along flush, etc I was always skeptical about using kyro.
But given the pretty nasty issues with OOM's via java serialization we
are seeing, wanted to know your thoughts on use of kyro with spark.
(Will be slightly involved to ensure everything gets registered, but I
want to go down the path assuming I hear good things in context of
spark)

Thanks,
Mridul


On Mon, May 5, 2014 at 1:20 AM, Reynold Xin r...@databricks.com wrote:
 I added the config option to use the non-default serializer. However, at
 the time, Kryo fails serializing pretty much any closures so that option
 was never really used / recommended.

 Since then the Scala ecosystem has developed, and some other projects are
 starting to use Kryo to serialize more Scala data structures, so I wouldn't
 be surprised if there is a way to work around this now. However, I don't
 have enough time to look into it at this point. If you do, please do post
 your findings. Thanks.



 On Sun, May 4, 2014 at 10:35 AM, Soren Macbeth so...@yieldbot.com wrote:

 apologies for the cross-list posts, but I've gotten zero response in the
 user list and I guess this list is probably more appropriate.

 According to the documentation, using the KryoSerializer for closures is
 supported. However, when I try to set `spark.closure.serializer` to
 `org.apache.spark.serializer.KryoSerializer` thing fail pretty miserably.

 The first thing that happens it that is throws exceptions over and over
 that it cannot locate my registrator class, which is located in my assembly
 jar like so:

 14/05/04 12:03:20 ERROR serializer.KryoSerializer: Failed to run
 spark.kryo.registrator
 java.lang.ClassNotFoundException: pickles.kryo.PicklesRegistrator
 at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
 at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
 at java.security.AccessController.doPrivileged(Native Method)
 at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
 at java.lang.Class.forName0(Native Method)
 at java.lang.Class.forName(Class.java:270)
 at

 org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$2.apply(KryoSerializer.scala:63)
 at

 org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$2.apply(KryoSerializer.scala:61)
 at scala.Option.foreach(Option.scala:236)
 at
 org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:61)
 at

 org.apache.spark.serializer.KryoSerializerInstance.init(KryoSerializer.scala:116)
 at

 org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:79)
 at

 org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:180)
 at

 org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
 at

 org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
 at java.security.AccessController.doPrivileged(Native Method)
 at javax.security.auth.Subject.doAs(Subject.java:415)
 at

 org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1438)
 at
 org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176)
 at

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:724)

 Now, I would expect it not to be able to find this class since it hasn't
 yet fetched my assembly jar to the executors. Once it does fetch my jar,
 those expections stop. Next, all the executor task die with the following
 exception:

 java.nio.ReadOnlyBufferException
 at java.nio.ByteBuffer.array(ByteBuffer.java:961)
 at

 org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:136)
 at

 org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:193)
 at

 org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
 at

 org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
 at java.security.AccessController.doPrivileged(Native Method)
 at javax.security.auth.Subject.doAs(Subject.java:415)
 at

 org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1438)
 at
 

[jira] [Commented] (SPARK-1606) spark-submit needs `--arg` for every application parameter

2014-05-03 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13988756#comment-13988756
 ] 

Mridul Muralidharan commented on SPARK-1606:


Crap, got to this too late.
We really should not have added this - and I would have -1'ed it.

We have had too many issues with trying to parse the user command line and 
getting into all sorts of issues which are entirely avoidable by simply being 
explicit about what the user wants to pass.
Trying to pass strings which contain escape characters and/or whitespace, etc 
is just going to be a nightmare about this change.

 spark-submit needs `--arg` for every application parameter
 --

 Key: SPARK-1606
 URL: https://issues.apache.org/jira/browse/SPARK-1606
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 1.0.0
Reporter: Xiangrui Meng
Assignee: Patrick Wendell
Priority: Blocker
 Fix For: 1.0.0


 If the application has a few parameters, the spark-submit command looks like 
 the following:
 {code}
 spark-submit --master yarn-cluster --class main.Class --arg --numPartitions 
 --arg 8 --arg --kryo --arg true
 {code}
 It is a little bit hard to read and modify. Maybe it is okay to treat all 
 arguments after `main.Class` as application parameters.
 {code}
 spark-submit --master yarn-cluster --class main.Class --numPartitions 8 
 --kryo true
 {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-1706) Allow multiple executors per worker in Standalone mode

2014-05-03 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13988868#comment-13988868
 ] 

Mridul Muralidharan commented on SPARK-1706:


Oh my, this was supposed to be logical addition once yarn changes were done.
Yarn changes were very heavily modelled on standalone mode (hence why 
yarn-standalone !) : and it was supposed to be a two way street : changes made 
for yarn support (multi-tennancy, etc) was supposed to have been added back to 
standalone mode when yarn support stabilized.
Did not realize I never got around to it - my apologies !

 Allow multiple executors per worker in Standalone mode
 --

 Key: SPARK-1706
 URL: https://issues.apache.org/jira/browse/SPARK-1706
 Project: Spark
  Issue Type: Improvement
  Components: Deploy
Reporter: Patrick Wendell
 Fix For: 1.1.0


 Right now if people want to launch multiple executors on each machine they 
 need to start multiple standalone workers. This is not too difficult, but it 
 means you have extra JVM's sitting around.
 We should just allow users to set a number of cores they want per-executor in 
 standalone mode and then allow packing multiple executors on each node. This 
 would make standalone mode more consistent with YARN in the way you request 
 resources.
 It's not too big of a change as far as I can see. You'd need to:
 1. Introduce a configuration for how many cores you want per executor.
 2. Change the scheduling logic in Master.scala to take this into account.
 3. Change CoarseGrainedSchedulerBackend to not assume a 1-1 correspondence 
 between hosts and executors.
 And maybe modify a few other places.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-1576) Passing of JAVA_OPTS to YARN on command line

2014-04-25 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13981313#comment-13981313
 ] 

Mridul Muralidharan commented on SPARK-1576:


There is a misunderstanding here - it is to pass SPARK_JAVA_OPTS : not 
JAVA_OPTS.
Directly passing JAVA_OPTS has beem removed

 Passing of JAVA_OPTS to YARN on command line
 

 Key: SPARK-1576
 URL: https://issues.apache.org/jira/browse/SPARK-1576
 Project: Spark
  Issue Type: Improvement
Affects Versions: 0.9.0, 1.0.0, 0.9.1
Reporter: Nishkam Ravi
 Fix For: 0.9.0, 1.0.0, 0.9.1

 Attachments: SPARK-1576.patch


 JAVA_OPTS can be passed by using either env variables (i.e., SPARK_JAVA_OPTS) 
 or as config vars (after Patrick's recent change). It would be good to allow 
 the user to pass them on command line as well to restrict scope to single 
 application invocation.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-1586) Fix issues with spark development under windows

2014-04-25 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13981321#comment-13981321
 ] 

Mridul Muralidharan commented on SPARK-1586:


Immediate issues fixed though there are more hive tests failing due to path 
related issues. pr : https://github.com/apache/spark/pull/505

 Fix issues with spark development under windows
 ---

 Key: SPARK-1586
 URL: https://issues.apache.org/jira/browse/SPARK-1586
 Project: Spark
  Issue Type: Bug
Reporter: Mridul Muralidharan
Assignee: Mridul Muralidharan
 Fix For: 1.0.0






--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Resolved] (SPARK-1587) Fix thread leak in spark

2014-04-25 Thread Mridul Muralidharan (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-1587?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan resolved SPARK-1587.


Resolution: Fixed

Fixed, https://github.com/apache/spark/pull/504

 Fix thread leak in spark
 

 Key: SPARK-1587
 URL: https://issues.apache.org/jira/browse/SPARK-1587
 Project: Spark
  Issue Type: Bug
Reporter: Mridul Muralidharan
Assignee: Mridul Muralidharan

 SparkContext.stop does not cause all threads to exit.
 When running tests via scalatest (which keeps reusing the same vm), over 
 time, this causes too many threads to be created causing tests to fail due to 
 inability to create more threads.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (BOOKKEEPER-560) Create readme for hedwig-client-jms

2014-04-25 Thread Mridul Muralidharan (JIRA)

 [ 
https://issues.apache.org/jira/browse/BOOKKEEPER-560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan updated BOOKKEEPER-560:
---

Assignee: (was: Mridul Muralidharan)

 Create readme for hedwig-client-jms 
 

 Key: BOOKKEEPER-560
 URL: https://issues.apache.org/jira/browse/BOOKKEEPER-560
 Project: Bookkeeper
  Issue Type: Task
  Components: Documentation
Reporter: Ivan Kelly

 This module needs a readme describing it as an experimental component and 
 what parts of jms are supported and not supported.
 It would also be good to have a bit more detail on why they can't be 
 supported with hedwig as it is today. A lot of this can be taken verbatim 
 from the package-info.html



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (BOOKKEEPER-648) BasicJMSTest failed

2014-04-25 Thread Mridul Muralidharan (JIRA)

 [ 
https://issues.apache.org/jira/browse/BOOKKEEPER-648?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan updated BOOKKEEPER-648:
---

Assignee: (was: Mridul Muralidharan)

 BasicJMSTest failed
 ---

 Key: BOOKKEEPER-648
 URL: https://issues.apache.org/jira/browse/BOOKKEEPER-648
 Project: Bookkeeper
  Issue Type: Bug
  Components: hedwig-client
Reporter: Flavio Junqueira

 While running tests, I got once a failure for this hedwig-client-jms test: 
 BasicJMSTest.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-1588) SPARK_JAVA_OPTS is not getting propagated

2014-04-23 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1588?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13978827#comment-13978827
 ] 

Mridul Muralidharan commented on SPARK-1588:


Apparently, SPARK_YARN_USER_ENV is also broken

 SPARK_JAVA_OPTS is not getting propagated
 -

 Key: SPARK-1588
 URL: https://issues.apache.org/jira/browse/SPARK-1588
 Project: Spark
  Issue Type: Bug
  Components: YARN
Affects Versions: 1.0.0
Reporter: Mridul Muralidharan
Priority: Blocker

 We could use SPARK_JAVA_OPTS to pass JAVA_OPTS to be used in the master.
 This is no longer working in current master.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


Re: all values for a key must fit in memory

2014-04-20 Thread Mridul Muralidharan
An iterator does not imply data has to be memory resident.
Think merge sort output as an iterator (disk backed).

Tom is actually planning to work on something similar with me on this
hopefully this or next month.

Regards,
Mridul


On Sun, Apr 20, 2014 at 11:46 PM, Sandy Ryza sandy.r...@cloudera.com wrote:
 Hey all,

 After a shuffle / groupByKey, Hadoop MapReduce allows the values for a key
 to not all fit in memory.  The current ShuffleFetcher.fetch API, which
 doesn't distinguish between keys and values, only returning an Iterator[P],
 seems incompatible with this.

 Any thoughts on how we could achieve parity here?

 -Sandy


[jira] [Commented] (SPARK-1524) TaskSetManager'd better not schedule tasks which has no preferred executorId using PROCESS_LOCAL in the first search process

2014-04-17 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1524?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13972864#comment-13972864
 ] 

Mridul Muralidharan commented on SPARK-1524:


The expectation is to fallback to a previous schedule type in case the higher 
level is not valid : though this is tricky in general case.
Will need to take a look at it - though given that I am tied up with other 
things, if someone else wants to take a crack, please feel free to do so !

Btw, use of IP's and multiple hostnames for a host is not supported in spark - 
so that is something that will need to be resolved at the deployment end.

 TaskSetManager'd better not schedule tasks which has no preferred executorId 
 using PROCESS_LOCAL in the first search process
 

 Key: SPARK-1524
 URL: https://issues.apache.org/jira/browse/SPARK-1524
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Reporter: YanTang Zhai
Priority: Minor

 ShuffleMapTask is constructed with TaskLocation which has only host not 
 (host, executorID) pair in DAGScheduler.
 When TaskSetManager schedules ShuffleMapTask which has no preferred 
 executorId using specific execId host and PROCESS_LOCAL locality level, no 
 tasks match the given locality constraint in the first search process.
 We also find that the host used by Scheduler is hostname while the host used 
 by TaskLocation is IP in our cluster. The tow hosts do not match, that makes 
 pendingTasksForHost HashMap empty and the finding task process against our 
 expectation.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-1476) 2GB limit in spark for blocks

2014-04-17 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13972978#comment-13972978
 ] 

Mridul Muralidharan commented on SPARK-1476:



[~matei] We are having some issue porting the netty shuffle copier code to 
support  2G since only ByteBuf seems to be exposed.
Before I dig into netty more, wanted to know if you or someone else from among 
spark developers knew how to add support for large buffers in our netty code. 
Thanks !

 2GB limit in spark for blocks
 -

 Key: SPARK-1476
 URL: https://issues.apache.org/jira/browse/SPARK-1476
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
 Environment: all
Reporter: Mridul Muralidharan
Assignee: Mridul Muralidharan
Priority: Critical
 Fix For: 1.1.0


 The underlying abstraction for blocks in spark is a ByteBuffer : which limits 
 the size of the block to 2GB.
 This has implication not just for managed blocks in use, but also for shuffle 
 blocks (memory mapped blocks are limited to 2gig, even though the api allows 
 for long), ser-deser via byte array backed outstreams (SPARK-1391), etc.
 This is a severe limitation for use of spark when used on non trivial 
 datasets.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-1453) Improve the way Spark on Yarn waits for executors before starting

2014-04-14 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13968390#comment-13968390
 ] 

Mridul Muralidharan commented on SPARK-1453:



(d) becomes relevant in case of headless/cron'ed jobs.
If the job is user initiated, then I agree, the user would typically kill and 
restart the job.

 Improve the way Spark on Yarn waits for executors before starting
 -

 Key: SPARK-1453
 URL: https://issues.apache.org/jira/browse/SPARK-1453
 Project: Spark
  Issue Type: Improvement
  Components: YARN
Affects Versions: 1.0.0
Reporter: Thomas Graves
Assignee: Thomas Graves

 Currently Spark on Yarn just delays a few seconds between when the spark 
 context is initialized and when it allows the job to start.  If you are on a 
 busy hadoop cluster is might take longer to get the number of executors. 
 In the very least we could make this timeout a configurable value.  Its 
 currently hardcoded to 3 seconds.  
 Better yet would be to allow user to give a minimum number of executors it 
 wants to wait for, but that looks much more complex. 



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Comment Edited] (SPARK-1476) 2GB limit in spark for blocks

2014-04-13 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13967854#comment-13967854
 ] 

Mridul Muralidharan edited comment on SPARK-1476 at 4/13/14 2:45 PM:
-

There are multiple issues at play here :

a) If a block goes beyond 2G, everything fails - this is the case for shuffle, 
cached and 'normal' blocks.
Irrespective of storage level and/or other config.
In a lot of cases, particularly when data generated 'increases' after a 
map/flatMap/etc, the user has no control over the size increase in the output 
block for a given input block (think iterations over datasets).

b) Increasing number of partitions is not always an option (for the subset of 
usecases where it can be done) :
1) It has an impact on number of intermediate files created while doing a 
shuffle. (ulimit constraints, IO performance issues, etc).
2) It does not help when there is skew anyway.

c) Compression codec, serializer used, etc have an impact.

d) 2G is extremely low limit to have in modern hardware : and this is 
particularly a severe limitation when we have nodes running on 32G to 64G ram 
and TB's of disk space available for spark.


To address specific points raised above :

A) [~pwendell] Mapreduce jobs dont fail in case the block size of files 
increases - it might be inefficient, it still runs (not that I know of any case 
where it does actually, but theoretically I guess it can become inefficient).
So analogy does not apply.
Also to add, 2G is not really an unlimited increase in block size - and in MR, 
output of a map can easily go a couple of orders above 2G : whether it is 
followed by a reduce or not.

B) [~matei] In the specific cases it was failing, the users were not caching 
the data but directly going to shuffle.
There was no skew from what I see : just the data size per key is high; and 
there are a lot of keys too btw (as iterations increase and nnz increases).
Note that it was an impl detail that it was not being cached - it could have 
been too.
Additionally, compression and/or serialization also apply implicitly in this 
case, since it was impacting shuffle - the 2G limit was observed at both the 
map and reduce side (in two different jobs).


In general, our effort is to make spark as a drop in replacement for most 
usecases which are currently being done via MR/Pig/etc.
Limitations of this sort make it difficult to position spark as a credible 
alternative.


Current approach we are exploring is to remove all direct references to 
ByteBuffer from spark (except for ConnectionManager, etc parts); and rely on a 
BlockData or similar datastructure which encapsulate the data corresponding to 
a block. By default, a single ByteBuffer should suffice but in case it does 
not, the class will automatically take care of splitting across blocks.
Similarly, all references to byte array backed streams will need to be replaced 
with a wrapper stream which multiplexes over byte array streams.
The performance impact for all 'normal' usecases should be the minimal, while 
allowing for spark to be used in cases where 2G limit is being hit.

The only unknown here is tachyon integration : where the interface is a 
ByteBuffer - and I am not knowledgable enough to comment on what the issues 
there would be.


was (Author: mridulm80):

There are multiple issues at play here :

a) If a block goes beyond 2G, everything fails - this is the case for shuffle, 
cached and 'normal' blocks.
Irrespective of storage level and/or other config.
In a lot of cases, particularly when data generated 'increases' after a 
map/flatMap/etc, the user has no control over the size increase in the output 
block for a given input block (think iterations over datasets).

b) Increasing number of partitions is not always an option (for the subset of 
usecases where it can be done) :
1) It has an impact on number of intermediate files created while doing a 
shuffle. (ulimit constraints, IO performance issues, etc).
2) It does not help when there is skew anyway.

c) Compression codec, serializer used, etc have an impact.

d) 2G is extremely low limit to have in modern hardware : and this is 
practically a severe limitation when we have nodes running on 32G to 64G ram 
and TB's of disk space available for spark.


To address specific points raised above :

A) [~pwendell] Mapreduce jobs dont fail in case the block size of files 
increases - it might be inefficient, it still runs (not that I know of any case 
where it does actually, but theoretically I guess it can).
So analogy does not apply.
To add, 2G is not really an unlimited increase in block size - and in MR, 
output of a map can easily go a couple of orders above 2G.

B) [~matei] In the specific cases it was failing, the users were not caching 
the data but directly going to shuffle.
There was no skew from what we see : just the data size per key is high

[jira] [Commented] (SPARK-1476) 2GB limit in spark for blocks

2014-04-12 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13967419#comment-13967419
 ] 

Mridul Muralidharan commented on SPARK-1476:


WIP Proposal:

- All references to ByteBuffer will need to be replaced with Seq[ByteBuffer].
This applies to definition of a block, memory mapped file segments for a 
shuffle block, etc.
- All use of byte array backed outputstream will need to be replaced with a 
aggregating outputstream which writes to multiple boas as and when array limits 
are hit.


 2GB limit in spark for blocks
 -

 Key: SPARK-1476
 URL: https://issues.apache.org/jira/browse/SPARK-1476
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
 Environment: all
Reporter: Mridul Muralidharan
Priority: Critical

 The underlying abstraction for blocks in spark is a ByteBuffer : which limits 
 the size of the block to 2GB.
 This has implication not just for managed blocks in use, but also for shuffle 
 blocks (memory mapped blocks are limited to 2gig, even though the api allows 
 for long), ser-deser via byte array backed outstreams (SPARK-1391), etc.
 This is a severe limitation for use of spark when used on non trivial 
 datasets.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-1391) BlockManager cannot transfer blocks larger than 2G in size

2014-04-11 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13966332#comment-13966332
 ] 

Mridul Muralidharan commented on SPARK-1391:


Another place where this is relevant is here :

java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:789)
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:98)
at 
org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:413)
at 
org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:339)
at org.apache.spark.storage.BlockManager.get(BlockManager.scala:506)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:39)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:233)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
at org.apache.spark.scheduler.Task.run(Task.scala:52)
at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211)
at 
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:43)
at 
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1262)
at 
org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:42)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:722)

So we might want to change the abstraction from single ByteBuffer to a sequence 
of bytebuffers ...

 BlockManager cannot transfer blocks larger than 2G in size
 --

 Key: SPARK-1391
 URL: https://issues.apache.org/jira/browse/SPARK-1391
 Project: Spark
  Issue Type: Bug
  Components: Block Manager, Shuffle
Affects Versions: 1.0.0
Reporter: Shivaram Venkataraman
Assignee: Min Zhou
 Attachments: SPARK-1391.diff


 If a task tries to remotely access a cached RDD block, I get an exception 
 when the block size is  2G. The exception is pasted below.
 Memory capacities are huge these days ( 60G), and many workflows depend on 
 having large blocks in memory, so it would be good to fix this bug.
 I don't know if the same thing happens on shuffles if one transfer (from 
 mapper to reducer) is  2G.
 {noformat}
 14/04/02 02:33:10 ERROR storage.BlockManagerWorker: Exception handling buffer 
 message
 java.lang.ArrayIndexOutOfBoundsException
 at 
 it.unimi.dsi.fastutil.io.FastByteArrayOutputStream.write(FastByteArrayOutputStream.java:96)
 at 
 it.unimi.dsi.fastutil.io.FastBufferedOutputStream.dumpBuffer(FastBufferedOutputStream.java:134)
 at 
 it.unimi.dsi.fastutil.io.FastBufferedOutputStream.write(FastBufferedOutputStream.java:164)
 at 
 java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
 at 
 java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785)
 at 
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188)
 at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
 at 
 org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:38)
 at 
 org.apache.spark.serializer.SerializationStream$class.writeAll(Serializer.scala:93)
 at 
 org.apache.spark.serializer.JavaSerializationStream.writeAll(JavaSerializer.scala:26)
 at 
 org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:913)
 at 
 org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:922)
 at 
 org.apache.spark.storage.MemoryStore.getBytes(MemoryStore.scala:102)
 at 
 org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:348)
 at 
 org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:323)
 at 
 org.apache.spark.storage.BlockManagerWorker.getBlock(BlockManagerWorker.scala:90)
 at 
 org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:69)
 at 
 org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44)
 at 
 org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44)
 at 
 scala.collection.TraversableLike$$anonfun$map$1.apply

[jira] [Commented] (SPARK-542) Cache Miss when machine have multiple hostname

2014-04-11 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13967185#comment-13967185
 ] 

Mridul Muralidharan commented on SPARK-542:
---

Spark uses only hostnames - not ip's.
Even for hostnames, it should ideally pick only the canonical hostname - not 
the others.

This was done by design in 0.8 ... try to find if multiple host names/ip's are 
all referring to the same physical host/container is fraught with too many 
issues.

 Cache Miss when machine have multiple hostname
 --

 Key: SPARK-542
 URL: https://issues.apache.org/jira/browse/SPARK-542
 Project: Spark
  Issue Type: Bug
Reporter: frankvictor

 HI, I encountered a weird runtime of pagerank in last few day.
 After debugging the job, I found it was caused by the DNS name.
 The machines of my cluster have multiple hostname, for example, slave 1 have 
 name (c001 and c001.cm.cluster)
 when spark adding cache in cacheTracker, it get c001 and add cache use it.
 But when schedule task in SimpleJob, the msos offer give spark 
 c001.cm.cluster.
 so It will never get preferred location!
 I thinks spark should handle the multiple hostname case(by using ip instead 
 of hostname, or some other methods).
 Thanks!



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-1453) Improve the way Spark on Yarn waits for executors before starting

2014-04-11 Thread Mridul Muralidharan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13967193#comment-13967193
 ] 

Mridul Muralidharan commented on SPARK-1453:


The timeout gets hit only when we dont get requested executors, right ? So it 
is more like max timeout (controlled by number of times we loop iirc).
The reason for keeping it stupid was simply because we have no gaurantees of 
number of containers which might be available to spark in a busy cluster : at 
times, it might not be practically possible to even get a fraction of the 
requested nodes (either due to busy cluster, or because of lack of resources - 
so infinite wait).

Ideally, I should have exposed the number of containers allocated - so that 
atleast user code could use it as spi and decide how to proceed for more 
complex cases. Missed out on this one.

I am not sure which usecases make sense.
a) Wait for X seconds or requested containers allocated.
b) Wait until minimum of Y containers allocated (out of X requested).
c) (b) with (a) - that is min containers and timeout on that.
d) (c) with exit if min containers not allocated ?

(d) is something which I keep hitting into (if I dont get my required minimum 
nodes, and job proceeds, I usually end up bringing down those nodes :-( )

 Improve the way Spark on Yarn waits for executors before starting
 -

 Key: SPARK-1453
 URL: https://issues.apache.org/jira/browse/SPARK-1453
 Project: Spark
  Issue Type: Improvement
  Components: YARN
Affects Versions: 1.0.0
Reporter: Thomas Graves
Assignee: Thomas Graves

 Currently Spark on Yarn just delays a few seconds between when the spark 
 context is initialized and when it allows the job to start.  If you are on a 
 busy hadoop cluster is might take longer to get the number of executors. 
 In the very least we could make this timeout a configurable value.  Its 
 currently hardcoded to 3 seconds.  
 Better yet would be to allow user to give a minimum number of executors it 
 wants to wait for, but that looks much more complex. 



--
This message was sent by Atlassian JIRA
(v6.2#6252)


ephemeral storage level in spark ?

2014-04-05 Thread Mridul Muralidharan
Hi,

  We have a requirement to use a (potential) ephemeral storage, which
is not within the VM, which is strongly tied to a worker node. So
source of truth for a block would still be within spark; but to
actually do computation, we would need to copy data to external device
(where it might lie around for a while : so data locality really
really helps if we can avoid a subsequent copy if it is already
present on computations on same block again).

I was wondering if the recently added storage level for tachyon would
help in this case (note, tachyon wont help; just the storage level
might).
What sort of guarantees does it provide ? How extensible is it ? Or is
it strongly tied to tachyon with only a generic name ?


Thanks,
Mridul


Re: ephemeral storage level in spark ?

2014-04-05 Thread Mridul Muralidharan
No, I am thinking along lines of writing to an accelerator card or
dedicated card with its own memory.

Regards,
Mridul
On Apr 6, 2014 5:19 AM, Haoyuan Li haoyuan...@gmail.com wrote:

 Hi Mridul,

 Do you mean the scenario that different Spark applications need to read the
 same raw data, which is stored in a remote cluster or machines. And the
 goal is to load the remote raw data only once?

 Haoyuan


 On Sat, Apr 5, 2014 at 4:30 PM, Mridul Muralidharan mri...@gmail.com
 wrote:

  Hi,
 
We have a requirement to use a (potential) ephemeral storage, which
  is not within the VM, which is strongly tied to a worker node. So
  source of truth for a block would still be within spark; but to
  actually do computation, we would need to copy data to external device
  (where it might lie around for a while : so data locality really
  really helps if we can avoid a subsequent copy if it is already
  present on computations on same block again).
 
  I was wondering if the recently added storage level for tachyon would
  help in this case (note, tachyon wont help; just the storage level
  might).
  What sort of guarantees does it provide ? How extensible is it ? Or is
  it strongly tied to tachyon with only a generic name ?
 
 
  Thanks,
  Mridul
 



 --
 Haoyuan Li
 Algorithms, Machines, People Lab, EECS, UC Berkeley
 http://www.cs.berkeley.edu/~haoyuan/



JIRA. github and asf updates

2014-03-29 Thread Mridul Muralidharan
Hi,

  So we are now receiving updates from three sources for each change to the PR.
While each of them handles a corner case which others might miss,
would be great if we could minimize the volume of duplicated
communication.


Regards,
Mridul


Re: JIRA. github and asf updates

2014-03-29 Thread Mridul Muralidharan
If the PR comments are going to be replicated into the jira's and they
are going to be set to dev@, then we could keep that and remove
[Github] updates ?
The last was added since discussions were happening off apache lists -
which should be handled by the jira updates ?

I dont mind the mails if they had content - this is just duplication
of the same message in three mails :-)
Btw, this is a good problem to have - a vibrant and very actively
engaged community generated a lot of meaningful traffic !
I just dont want to get distracted from it by repetitions.

Regards,
Mridul


On Sat, Mar 29, 2014 at 11:46 PM, Patrick Wendell pwend...@gmail.com wrote:
 Ah sorry I see - Jira updates are going to the dev list. Maybe that's not
 desirable. I think we should send them to the issues@ list.


 On Sat, Mar 29, 2014 at 11:16 AM, Patrick Wendell pwend...@gmail.comwrote:

 Mridul,

 You can unsubscribe yourself from any of these sources, right?

 - Patrick


 On Sat, Mar 29, 2014 at 11:05 AM, Mridul Muralidharan 
 mri...@gmail.comwrote:

 Hi,

   So we are now receiving updates from three sources for each change to
 the PR.
 While each of them handles a corner case which others might miss,
 would be great if we could minimize the volume of duplicated
 communication.


 Regards,
 Mridul





Re: Spark 0.9.1 release

2014-03-25 Thread Mridul Muralidharan
On Wed, Mar 26, 2014 at 10:53 AM, Tathagata Das
tathagata.das1...@gmail.com wrote:
 PR 159 seems like a fairly big patch to me. And quite recent, so its impact
 on the scheduling is not clear. It may also depend on other changes that
 may have gotten into the DAGScheduler but not pulled into branch 0.9. I am
 not sure it is a good idea to pull that in. We can pull those changes later
 for 0.9.2 if required.


There is no impact on scheduling : it only has an impact on error
handling - it ensures that you can actually use spark on yarn in
multi-tennent clusters more reliably.
Currently, any reasonably long running job (30 mins+) working on non
trivial dataset will fail due to accumulated failures in spark.


Regards,
Mridul



 TD




 On Tue, Mar 25, 2014 at 8:44 PM, Mridul Muralidharan mri...@gmail.comwrote:

 Forgot to mention this in the earlier request for PR's.
 If there is another RC being cut, please add
 https://github.com/apache/spark/pull/159 to it too (if not done
 already !).

 Thanks,
 Mridul

 On Thu, Mar 20, 2014 at 5:37 AM, Tathagata Das
 tathagata.das1...@gmail.com wrote:
   Hello everyone,
 
  Since the release of Spark 0.9, we have received a number of important
 bug
  fixes and we would like to make a bug-fix release of Spark 0.9.1. We are
  going to cut a release candidate soon and we would love it if people test
  it out. We have backported several bug fixes into the 0.9 and updated
 JIRA
  accordingly
 https://spark-project.atlassian.net/browse/SPARK-1275?jql=project%20in%20(SPARK%2C%20BLINKDB%2C%20MLI%2C%20MLLIB%2C%20SHARK%2C%20STREAMING%2C%20GRAPH%2C%20TACHYON)%20AND%20fixVersion%20%3D%200.9.1%20AND%20status%20in%20(Resolved%2C%20Closed)
 .
  Please let me know if there are fixes that were not backported but you
  would like to see them in 0.9.1.
 
  Thanks!
 
  TD



Re: Spark 0.9.1 release

2014-03-19 Thread Mridul Muralidharan
Would be great if the garbage collection PR is also committed - if not
the whole thing, atleast the part to unpersist broadcast variables
explicitly would be great.
Currently we are running with a custom impl which does something
similar, and I would like to move to standard distribution for that.


Thanks,
Mridul


On Wed, Mar 19, 2014 at 5:07 PM, Tathagata Das
tathagata.das1...@gmail.com wrote:
  Hello everyone,

 Since the release of Spark 0.9, we have received a number of important bug
 fixes and we would like to make a bug-fix release of Spark 0.9.1. We are
 going to cut a release candidate soon and we would love it if people test
 it out. We have backported several bug fixes into the 0.9 and updated JIRA
 accordinglyhttps://spark-project.atlassian.net/browse/SPARK-1275?jql=project%20in%20(SPARK%2C%20BLINKDB%2C%20MLI%2C%20MLLIB%2C%20SHARK%2C%20STREAMING%2C%20GRAPH%2C%20TACHYON)%20AND%20fixVersion%20%3D%200.9.1%20AND%20status%20in%20(Resolved%2C%20Closed).
 Please let me know if there are fixes that were not backported but you
 would like to see them in 0.9.1.

 Thanks!

 TD


Re: Spark 0.9.1 release

2014-03-19 Thread Mridul Muralidharan
If 1.0 is just round the corner, then it is fair enough to push to
that, thanks for clarifying !

Regards,
Mridul

On Wed, Mar 19, 2014 at 6:12 PM, Tathagata Das
tathagata.das1...@gmail.com wrote:
 I agree that the garbage collection
 PRhttps://github.com/apache/spark/pull/126would make things very
 convenient in a lot of usecases. However, there are
 two broads reasons why it is hard for that PR to get into 0.9.1.
 1. The PR still needs some amount of work and quite a lot of testing. While
 we enable RDD and shuffle cleanup based on Java GC, its behavior in a real
 workloads still needs to be understood (especially since it is tied to
 Spark driver's garbage collection behavior).
 2. This actually changes some of the semantic behavior of Spark and should
 not be included in a bug-fix release. The PR will definitely be present for
 Spark 1.0, which is expected to be release around end of April (not too far
 ;) ).

 TD


 On Wed, Mar 19, 2014 at 5:57 PM, Mridul Muralidharan mri...@gmail.comwrote:

 Would be great if the garbage collection PR is also committed - if not
 the whole thing, atleast the part to unpersist broadcast variables
 explicitly would be great.
 Currently we are running with a custom impl which does something
 similar, and I would like to move to standard distribution for that.


 Thanks,
 Mridul


 On Wed, Mar 19, 2014 at 5:07 PM, Tathagata Das
 tathagata.das1...@gmail.com wrote:
   Hello everyone,
 
  Since the release of Spark 0.9, we have received a number of important
 bug
  fixes and we would like to make a bug-fix release of Spark 0.9.1. We are
  going to cut a release candidate soon and we would love it if people test
  it out. We have backported several bug fixes into the 0.9 and updated
 JIRA
  accordingly
 https://spark-project.atlassian.net/browse/SPARK-1275?jql=project%20in%20(SPARK%2C%20BLINKDB%2C%20MLI%2C%20MLLIB%2C%20SHARK%2C%20STREAMING%2C%20GRAPH%2C%20TACHYON)%20AND%20fixVersion%20%3D%200.9.1%20AND%20status%20in%20(Resolved%2C%20Closed)
 .
  Please let me know if there are fixes that were not backported but you
  would like to see them in 0.9.1.
 
  Thanks!
 
  TD



Re: Fwd: ASF Board Meeting Summary - February 19, 2014

2014-02-20 Thread Mridul Muralidharan
Wonderful news ! Congrats all :-)

Regards,
Mridul
On Feb 20, 2014 10:07 PM, Andy Konwinski andykonwin...@gmail.com wrote:

 Congrats Spark community! I think this means we are officially now a TLP!
 -- Forwarded message --
 From: Brett Porter chair...@apache.org
 Date: Feb 19, 2014 11:26 PM
 Subject: ASF Board Meeting Summary - February 19, 2014
 To: committ...@apache.org
 Cc:

 The February board meeting took place on the 19th.

 The following directors were present:

   Shane Curcuru
   Bertrand Delacretaz
   Roy T. Fielding
   Jim Jagielski
   Chris Mattmann
   Brett Porter
   Greg Stein

 Apologies were received from Sam Ruby.

 The following officers were present:

   Ross Gardler
   Rich Bowen
   Craig L Russell

 The following guests were present:

   Sean Kelly
   Daniel Gruno
   Phil Steitz
   Jake Farrell
   Marvin Humphrey
   David Nalley
   Noah Slater

 The January minutes were approved.
 Minutes will be posted to
 http://www.apache.org/foundation/records/minutes/

 The following reports were not approved and are expected next month:

  Report from the Apache Lenya Project  [Richard Frovarp]

 The following reports were not received and are expected next month:

   Report from the Apache Abdera Project  [Ant Elder]
   Report from the Apache Buildr Project  [Alex Boisvert]
   Report from the Apache Click Project  [Malcolm Edgar]
   Report from the Apache Community Development Project  [Luciano Resende]
   Report from the Apache Continuum Project  [Brent Atkinson]
   Report from the Apache Creadur Project  [Robert Burrell Donkin]
   Report from the Apache DirectMemory Project  [Raffaele P. Guidi]
   Report from the Apache Giraph Project  [Avery Ching]
   Report from the Apache Velocity Project  [Nathan Bubna]

 All other reports to the board were approved.

 The following resolutions were passed unanimously:

   A. Establish the Apache Open Climate Workbench Project (Michael Joyce,
 VP)
   B. Change the Apache Incubator Project Chair (Roman Shaposhnik, VP)
   C. Establish the Apache Spark Project (Matei Zaharia, VP)
   D. Establish the Apache Knox Project (Kevin Minder, VP)

 The next board meeting will be on the 19th of March.



Re: coding style discussion: explicit return type in public APIs

2014-02-19 Thread Mridul Muralidharan
You are right.
A degenerate case would be :

def createFoo = new FooImpl()

vs

def createFoo: Foo = new FooImpl()

Former will cause api instability. Reynold, maybe this is already
avoided - and I understood it wrong ?

Thanks,
Mridul



On Wed, Feb 19, 2014 at 12:44 PM, Christopher Nguyen c...@adatao.com wrote:
 Mridul, IIUUC, what you've mentioned did come to mind, but I deemed it
 orthogonal to the stylistic issue Reynold is talking about.

 I believe you're referring to the case where there is a specific desired
 return type by API design, but the implementation does not, in which case,
 of course, one must define the return type. That's an API requirement and
 not just a matter of readability.

 We could add this as an NB in the proposed guideline.

 --
 Christopher T. Nguyen
 Co-founder  CEO, Adatao http://adatao.com
 linkedin.com/in/ctnguyen



 On Tue, Feb 18, 2014 at 10:40 PM, Reynold Xin r...@databricks.com wrote:

 +1 Christopher's suggestion.

 Mridul,

 How would that happen? Case 3 requires the method to be invoking the
 constructor directly. It was implicit in my email, but the return type
 should be the same as the class itself.




 On Tue, Feb 18, 2014 at 10:37 PM, Mridul Muralidharan mri...@gmail.com
 wrote:

  Case 3 can be a potential issue.
  Current implementation might be returning a concrete class which we
  might want to change later - making it a type change.
  The intention might be to return an RDD (for example), but the
  inferred type might be a subclass of RDD - and future changes will
  cause signature change.
 
 
  Regards,
  Mridul
 
 
  On Wed, Feb 19, 2014 at 11:52 AM, Reynold Xin r...@databricks.com
 wrote:
   Hi guys,
  
   Want to bring to the table this issue to see what other members of the
   community think and then we can codify it in the Spark coding style
  guide.
   The topic is about declaring return types explicitly in public APIs.
  
   In general I think we should favor explicit type declaration in public
   APIs. However, I do think there are 3 cases we can avoid the public API
   definition because in these 3 cases the types are self-evident 
  repetitive.
  
   Case 1. toString
  
   Case 2. A method returning a string or a val defining a string
  
   def name = abcd // this is so obvious that it is a string
   val name = edfg // this too
  
   Case 3. The method or variable is invoking the constructor of a class
 and
   return that immediately. For example:
  
   val a = new SparkContext(...)
   implicit def rddToAsyncRDDActions[T: ClassTag](rdd: RDD[T]) = new
   AsyncRDDActions(rdd)
  
  
   Thoughts?
 



Re: coding style discussion: explicit return type in public APIs

2014-02-19 Thread Mridul Muralidharan
Without bikeshedding this too much ... It is likely incorrect (not wrong) -
and rules like this potentially cause things to slip through.

Explicit return type strictly specifies what is being exposed (think in
face of impl change - createFoo changes in future from Foo to Foo1 or Foo2)
.. being conservative about how to specify exposed interfaces, imo,
outweighs potential gains in breveity of code.
Btw this is a degenerate contrieved example already stretching its use ...

Regards
Mridul

Regards
Mridul
On Feb 19, 2014 1:49 PM, Reynold Xin r...@databricks.com wrote:

 Yes, the case you brought up is not a matter of readability or style. If it
 returns a different type, it should be declared (otherwise it is just
 wrong).


 On Wed, Feb 19, 2014 at 12:17 AM, Mridul Muralidharan mri...@gmail.com
 wrote:

  You are right.
  A degenerate case would be :
 
  def createFoo = new FooImpl()
 
  vs
 
  def createFoo: Foo = new FooImpl()
 
  Former will cause api instability. Reynold, maybe this is already
  avoided - and I understood it wrong ?
 
  Thanks,
  Mridul
 
 
 
  On Wed, Feb 19, 2014 at 12:44 PM, Christopher Nguyen c...@adatao.com
  wrote:
   Mridul, IIUUC, what you've mentioned did come to mind, but I deemed it
   orthogonal to the stylistic issue Reynold is talking about.
  
   I believe you're referring to the case where there is a specific
 desired
   return type by API design, but the implementation does not, in which
  case,
   of course, one must define the return type. That's an API requirement
 and
   not just a matter of readability.
  
   We could add this as an NB in the proposed guideline.
  
   --
   Christopher T. Nguyen
   Co-founder  CEO, Adatao http://adatao.com
   linkedin.com/in/ctnguyen
  
  
  
   On Tue, Feb 18, 2014 at 10:40 PM, Reynold Xin r...@databricks.com
  wrote:
  
   +1 Christopher's suggestion.
  
   Mridul,
  
   How would that happen? Case 3 requires the method to be invoking the
   constructor directly. It was implicit in my email, but the return type
   should be the same as the class itself.
  
  
  
  
   On Tue, Feb 18, 2014 at 10:37 PM, Mridul Muralidharan 
 mri...@gmail.com
   wrote:
  
Case 3 can be a potential issue.
Current implementation might be returning a concrete class which we
might want to change later - making it a type change.
The intention might be to return an RDD (for example), but the
inferred type might be a subclass of RDD - and future changes will
cause signature change.
   
   
Regards,
Mridul
   
   
On Wed, Feb 19, 2014 at 11:52 AM, Reynold Xin r...@databricks.com
   wrote:
 Hi guys,

 Want to bring to the table this issue to see what other members of
  the
 community think and then we can codify it in the Spark coding
 style
guide.
 The topic is about declaring return types explicitly in public
 APIs.

 In general I think we should favor explicit type declaration in
  public
 APIs. However, I do think there are 3 cases we can avoid the
 public
  API
 definition because in these 3 cases the types are self-evident 
repetitive.

 Case 1. toString

 Case 2. A method returning a string or a val defining a string

 def name = abcd // this is so obvious that it is a string
 val name = edfg // this too

 Case 3. The method or variable is invoking the constructor of a
  class
   and
 return that immediately. For example:

 val a = new SparkContext(...)
 implicit def rddToAsyncRDDActions[T: ClassTag](rdd: RDD[T]) = new
 AsyncRDDActions(rdd)


 Thoughts?
   
  
 



Re: coding style discussion: explicit return type in public APIs

2014-02-19 Thread Mridul Muralidharan
My initial mail had it listed, adding more details here since I assume I am
missing something or not being clear - please note, this is just
illustrative and my scala knowledge is bad :-) (I am trying to draw
parallels from mistakes in java world)

def createFoo = new Foo()

To

def createFoo = new Foo1()

To

def createFoo = new Foo2()

(appropriate inheritance applied - parent Foo).

I am thinking from api evolution and binary compatibility point of view

Regards,
Mridul
On Feb 20, 2014 12:12 AM, Reynold Xin r...@databricks.com wrote:

 Mridul,

 Can you be more specific in the createFoo example?

 def myFunc = createFoo

 is disallowed in my guideline. It is invoking a function createFoo, not the
 constructor of Foo.




 On Wed, Feb 19, 2014 at 10:39 AM, Mridul Muralidharan mri...@gmail.com
 wrote:

  Without bikeshedding this too much ... It is likely incorrect (not
 wrong) -
  and rules like this potentially cause things to slip through.
 
  Explicit return type strictly specifies what is being exposed (think in
  face of impl change - createFoo changes in future from Foo to Foo1 or
 Foo2)
  .. being conservative about how to specify exposed interfaces, imo,
  outweighs potential gains in breveity of code.
  Btw this is a degenerate contrieved example already stretching its use
 ...
 
  Regards
  Mridul
 
  Regards
  Mridul
  On Feb 19, 2014 1:49 PM, Reynold Xin r...@databricks.com wrote:
 
   Yes, the case you brought up is not a matter of readability or style.
 If
  it
   returns a different type, it should be declared (otherwise it is just
   wrong).
  
  
   On Wed, Feb 19, 2014 at 12:17 AM, Mridul Muralidharan 
 mri...@gmail.com
   wrote:
  
You are right.
A degenerate case would be :
   
def createFoo = new FooImpl()
   
vs
   
def createFoo: Foo = new FooImpl()
   
Former will cause api instability. Reynold, maybe this is already
avoided - and I understood it wrong ?
   
Thanks,
Mridul
   
   
   
On Wed, Feb 19, 2014 at 12:44 PM, Christopher Nguyen c...@adatao.com
 
wrote:
 Mridul, IIUUC, what you've mentioned did come to mind, but I deemed
  it
 orthogonal to the stylistic issue Reynold is talking about.

 I believe you're referring to the case where there is a specific
   desired
 return type by API design, but the implementation does not, in
 which
case,
 of course, one must define the return type. That's an API
 requirement
   and
 not just a matter of readability.

 We could add this as an NB in the proposed guideline.

 --
 Christopher T. Nguyen
 Co-founder  CEO, Adatao http://adatao.com
 linkedin.com/in/ctnguyen



 On Tue, Feb 18, 2014 at 10:40 PM, Reynold Xin r...@databricks.com
 
wrote:

 +1 Christopher's suggestion.

 Mridul,

 How would that happen? Case 3 requires the method to be invoking
 the
 constructor directly. It was implicit in my email, but the return
  type
 should be the same as the class itself.




 On Tue, Feb 18, 2014 at 10:37 PM, Mridul Muralidharan 
   mri...@gmail.com
 wrote:

  Case 3 can be a potential issue.
  Current implementation might be returning a concrete class which
  we
  might want to change later - making it a type change.
  The intention might be to return an RDD (for example), but the
  inferred type might be a subclass of RDD - and future changes
 will
  cause signature change.
 
 
  Regards,
  Mridul
 
 
  On Wed, Feb 19, 2014 at 11:52 AM, Reynold Xin 
  r...@databricks.com
 wrote:
   Hi guys,
  
   Want to bring to the table this issue to see what other
 members
  of
the
   community think and then we can codify it in the Spark coding
   style
  guide.
   The topic is about declaring return types explicitly in public
   APIs.
  
   In general I think we should favor explicit type declaration
 in
public
   APIs. However, I do think there are 3 cases we can avoid the
   public
API
   definition because in these 3 cases the types are
 self-evident 
  repetitive.
  
   Case 1. toString
  
   Case 2. A method returning a string or a val defining a string
  
   def name = abcd // this is so obvious that it is a string
   val name = edfg // this too
  
   Case 3. The method or variable is invoking the constructor of
 a
class
 and
   return that immediately. For example:
  
   val a = new SparkContext(...)
   implicit def rddToAsyncRDDActions[T: ClassTag](rdd: RDD[T]) =
  new
   AsyncRDDActions(rdd)
  
  
   Thoughts?
 

   
  
 



<    4   5   6   7   8   9   10   11   12   13   >