[jira] [Commented] (SPARK-2089) With YARN, preferredNodeLocalityData isn't honored
[ https://issues.apache.org/jira/browse/SPARK-2089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14099115#comment-14099115 ] Mridul Muralidharan commented on SPARK-2089: For a general case, wont InputFormat's not have customizations to them for creation and/or initialization before they can be used to get splits ? (other than file names I mean). With YARN, preferredNodeLocalityData isn't honored --- Key: SPARK-2089 URL: https://issues.apache.org/jira/browse/SPARK-2089 Project: Spark Issue Type: Bug Components: YARN Affects Versions: 1.0.0 Reporter: Sandy Ryza Assignee: Sandy Ryza Priority: Critical When running in YARN cluster mode, apps can pass preferred locality data when constructing a Spark context that will dictate where to request executor containers. This is currently broken because of a race condition. The Spark-YARN code runs the user class and waits for it to start up a SparkContext. During its initialization, the SparkContext will create a YarnClusterScheduler, which notifies a monitor in the Spark-YARN code that . The Spark-Yarn code then immediately fetches the preferredNodeLocationData from the SparkContext and uses it to start requesting containers. But in the SparkContext constructor that takes the preferredNodeLocationData, setting preferredNodeLocationData comes after the rest of the initialization, so, if the Spark-YARN code comes around quickly enough after being notified, the data that's fetched is the empty unset version. The occurred during all of my runs. -- This message was sent by Atlassian JIRA (v6.2#6252) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-2089) With YARN, preferredNodeLocalityData isn't honored
[ https://issues.apache.org/jira/browse/SPARK-2089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14095247#comment-14095247 ] Mridul Muralidharan commented on SPARK-2089: Since I am not maintaining the code anymore, I dont have strong preference either way. I am not sure what the format means btw - I see multiple nodes and racks mentioned in the same group ... In general though, I am not convinced it is a good direction to take. 1) It is a workaround for a design issue and has non trivial performance implications (serializing into this form to immediately deserialize it is expensive for large inputs : not to mention, it gets shipped to executors for no reason). 2) It locks us into a format which provides inadequate information - number of blocks per node, size per block, etc is lost (or maybe I just did not understand what the format is !). 3) We are currently investigating evolving in the opposite direction - add more information so that we can be more specific about where to allocate executors. For example: I can see the fairly near term need to associate executors with accelerator cards (and break the OFF_HEAP - tachyon implicit assumption). A string representation makes it fragile to evolve. As I mentioned before, the current yarn allocation model in spark is a very naive implementation - which I did not expect to survive this long : it was directly from our prototype. We really should be modifying it to consider cost of data transfer and prioritize allocation that way (number of blocks on a node/rack, size of blocks, number of replicas available, etc). For small datasets on small enough clusters this is not relevant but has implications as we grow along both axis. With YARN, preferredNodeLocalityData isn't honored --- Key: SPARK-2089 URL: https://issues.apache.org/jira/browse/SPARK-2089 Project: Spark Issue Type: Bug Components: YARN Affects Versions: 1.0.0 Reporter: Sandy Ryza Assignee: Sandy Ryza Priority: Critical When running in YARN cluster mode, apps can pass preferred locality data when constructing a Spark context that will dictate where to request executor containers. This is currently broken because of a race condition. The Spark-YARN code runs the user class and waits for it to start up a SparkContext. During its initialization, the SparkContext will create a YarnClusterScheduler, which notifies a monitor in the Spark-YARN code that . The Spark-Yarn code then immediately fetches the preferredNodeLocationData from the SparkContext and uses it to start requesting containers. But in the SparkContext constructor that takes the preferredNodeLocationData, setting preferredNodeLocationData comes after the rest of the initialization, so, if the Spark-YARN code comes around quickly enough after being notified, the data that's fetched is the empty unset version. The occurred during all of my runs. -- This message was sent by Atlassian JIRA (v6.2#6252) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-2962) Suboptimal scheduling in spark
[ https://issues.apache.org/jira/browse/SPARK-2962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14092807#comment-14092807 ] Mridul Muralidharan commented on SPARK-2962: On further investigation : a) The primary issue is a combination of SPARK-2089 and current schedule behavior for pendingTasksWithNoPrefs. SPARK-2089 leads to very bad allocation of nodes - particularly has an impact on bigger clusters. It leads to a lot of block having no data or rack local executors - causing them to end up in pendingTasksWithNoPrefs. While loading data off dfs, when an executor is being scheduled, even though there might be rack local schedules available for it (or, on waiting a while, data local too - see (b) below), because of current scheduler behavior, tasks from pendingTasksWithNoPrefs get scheduled : causing a large number of ANY tasks to be scheduled at the very onset. The combination of these, with lack of marginal alleviation via (b) is what caused the performance impact. b) spark.scheduler.minRegisteredExecutorsRatio was not yet been used in the workload - so that might alleviate some of the non deterministic waiting and ensuring adequate executors are allocated ! Thanks [~lirui] Suboptimal scheduling in spark -- Key: SPARK-2962 URL: https://issues.apache.org/jira/browse/SPARK-2962 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.1.0 Environment: All Reporter: Mridul Muralidharan In findTask, irrespective of 'locality' specified, pendingTasksWithNoPrefs are always scheduled with PROCESS_LOCAL pendingTasksWithNoPrefs contains tasks which currently do not have any alive locations - but which could come in 'later' : particularly relevant when spark app is just coming up and containers are still being added. This causes a large number of non node local tasks to be scheduled incurring significant network transfers in the cluster when running with non trivial datasets. The comment // Look for no-pref tasks after rack-local tasks since they can run anywhere. is misleading in the method code : locality levels start from process_local down to any, and so no prefs get scheduled much before rack. Also note that, currentLocalityIndex is reset to the taskLocality returned by this method - so returning PROCESS_LOCAL as the level will trigger wait times again. (Was relevant before recent change to scheduler, and might be again based on resolution of this issue). Found as part of writing test for SPARK-2931 -- This message was sent by Atlassian JIRA (v6.2#6252) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-2962) Suboptimal scheduling in spark
Mridul Muralidharan created SPARK-2962: -- Summary: Suboptimal scheduling in spark Key: SPARK-2962 URL: https://issues.apache.org/jira/browse/SPARK-2962 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.1.0 Environment: All Reporter: Mridul Muralidharan In findTask, irrespective of 'locality' specified, pendingTasksWithNoPrefs are always scheduled with PROCESS_LOCAL pendingTasksWithNoPrefs contains tasks which currently do not have any alive locations - but which could come in 'later' : particularly relevant when spark app is just coming up and containers are still being added. This causes a large number of non node local tasks to be scheduled incurring significant network transfers in the cluster when running with non trivial datasets. The comment // Look for no-pref tasks after rack-local tasks since they can run anywhere. is misleading in the method code : locality levels start from process_local down to any, and so no prefs get scheduled much before rack. Also note that, currentLocalityIndex is reset to the taskLocality returned by this method - so returning PROCESS_LOCAL as the level will trigger wait times again. (Was relevant before recent change to scheduler, and might be again based on resolution of this issue). Found as part of writing test for SPARK-2931 -- This message was sent by Atlassian JIRA (v6.2#6252) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-2962) Suboptimal scheduling in spark
[ https://issues.apache.org/jira/browse/SPARK-2962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14092427#comment-14092427 ] Mridul Muralidharan commented on SPARK-2962: To give more context; a) Our jobs start with load data from dfs as starting point : and so this is the first stage that gets executed. b) We are sleeping for 1 minute before starting the jobs (in case cluster is busy, etc) - unfortunately, this is not sufficient and iirc there is no programmatic way to wait more deterministically for X% of node (was something added to alleviate this ? I did see some discussion) c) This becomes more of a problem because spark does not honour preferred location anymore while running in yarn. See SPARK-208 - due to 1.0 interface changes. [ Practically, if we are using large enough number of nodes (with replication of 3 or higher), usually we do end up with quite of lot of data local tasks eventually - so (c) is not an immediate concern for our current jobs assuming (b) is not an issue, though it is suboptimal in general case ] Suboptimal scheduling in spark -- Key: SPARK-2962 URL: https://issues.apache.org/jira/browse/SPARK-2962 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.1.0 Environment: All Reporter: Mridul Muralidharan In findTask, irrespective of 'locality' specified, pendingTasksWithNoPrefs are always scheduled with PROCESS_LOCAL pendingTasksWithNoPrefs contains tasks which currently do not have any alive locations - but which could come in 'later' : particularly relevant when spark app is just coming up and containers are still being added. This causes a large number of non node local tasks to be scheduled incurring significant network transfers in the cluster when running with non trivial datasets. The comment // Look for no-pref tasks after rack-local tasks since they can run anywhere. is misleading in the method code : locality levels start from process_local down to any, and so no prefs get scheduled much before rack. Also note that, currentLocalityIndex is reset to the taskLocality returned by this method - so returning PROCESS_LOCAL as the level will trigger wait times again. (Was relevant before recent change to scheduler, and might be again based on resolution of this issue). Found as part of writing test for SPARK-2931 -- This message was sent by Atlassian JIRA (v6.2#6252) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-2962) Suboptimal scheduling in spark
[ https://issues.apache.org/jira/browse/SPARK-2962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14092430#comment-14092430 ] Mridul Muralidharan commented on SPARK-2962: Hi [~matei], I am referencing the latest code (as of yday night). pendingTasksWithNoPrefs currnetly contains both tasks which truely have no preference, and tasks which have preference which are unavailble - and the latter is what is triggering this, since that can change during the execution of the stage. Hope I am not missing something ? Thanks, Mridul Suboptimal scheduling in spark -- Key: SPARK-2962 URL: https://issues.apache.org/jira/browse/SPARK-2962 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.1.0 Environment: All Reporter: Mridul Muralidharan In findTask, irrespective of 'locality' specified, pendingTasksWithNoPrefs are always scheduled with PROCESS_LOCAL pendingTasksWithNoPrefs contains tasks which currently do not have any alive locations - but which could come in 'later' : particularly relevant when spark app is just coming up and containers are still being added. This causes a large number of non node local tasks to be scheduled incurring significant network transfers in the cluster when running with non trivial datasets. The comment // Look for no-pref tasks after rack-local tasks since they can run anywhere. is misleading in the method code : locality levels start from process_local down to any, and so no prefs get scheduled much before rack. Also note that, currentLocalityIndex is reset to the taskLocality returned by this method - so returning PROCESS_LOCAL as the level will trigger wait times again. (Was relevant before recent change to scheduler, and might be again based on resolution of this issue). Found as part of writing test for SPARK-2931 -- This message was sent by Atlassian JIRA (v6.2#6252) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-2962) Suboptimal scheduling in spark
[ https://issues.apache.org/jira/browse/SPARK-2962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14092431#comment-14092431 ] Mridul Muralidharan commented on SPARK-2962: Note, I dont think this is a regression in 1.1, and probably existed much earlier too. Other issues are making us notice this (like SPARK-2089) - we moved to 1.1 from 0.9 recently. Suboptimal scheduling in spark -- Key: SPARK-2962 URL: https://issues.apache.org/jira/browse/SPARK-2962 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.1.0 Environment: All Reporter: Mridul Muralidharan In findTask, irrespective of 'locality' specified, pendingTasksWithNoPrefs are always scheduled with PROCESS_LOCAL pendingTasksWithNoPrefs contains tasks which currently do not have any alive locations - but which could come in 'later' : particularly relevant when spark app is just coming up and containers are still being added. This causes a large number of non node local tasks to be scheduled incurring significant network transfers in the cluster when running with non trivial datasets. The comment // Look for no-pref tasks after rack-local tasks since they can run anywhere. is misleading in the method code : locality levels start from process_local down to any, and so no prefs get scheduled much before rack. Also note that, currentLocalityIndex is reset to the taskLocality returned by this method - so returning PROCESS_LOCAL as the level will trigger wait times again. (Was relevant before recent change to scheduler, and might be again based on resolution of this issue). Found as part of writing test for SPARK-2931 -- This message was sent by Atlassian JIRA (v6.2#6252) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-2962) Suboptimal scheduling in spark
[ https://issues.apache.org/jira/browse/SPARK-2962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14092427#comment-14092427 ] Mridul Muralidharan edited comment on SPARK-2962 at 8/11/14 4:35 AM: - To give more context; a) Our jobs start with load data from dfs as starting point : and so this is the first stage that gets executed. b) We are sleeping for 1 minute before starting the jobs (in case cluster is busy, etc) - unfortunately, this is not sufficient and iirc there is no programmatic way to wait more deterministically for X% of node (was something added to alleviate this ? I did see some discussion) c) This becomes more of a problem because spark does not honour preferred location anymore while running in yarn. See SPARK-2089 - due to 1.0 interface changes. [ Practically, if we are using large enough number of nodes (with replication of 3 or higher), usually we do end up with quite of lot of data local tasks eventually - so (c) is not an immediate concern for our current jobs assuming (b) is not an issue, though it is suboptimal in general case ] was (Author: mridulm80): To give more context; a) Our jobs start with load data from dfs as starting point : and so this is the first stage that gets executed. b) We are sleeping for 1 minute before starting the jobs (in case cluster is busy, etc) - unfortunately, this is not sufficient and iirc there is no programmatic way to wait more deterministically for X% of node (was something added to alleviate this ? I did see some discussion) c) This becomes more of a problem because spark does not honour preferred location anymore while running in yarn. See SPARK-208 - due to 1.0 interface changes. [ Practically, if we are using large enough number of nodes (with replication of 3 or higher), usually we do end up with quite of lot of data local tasks eventually - so (c) is not an immediate concern for our current jobs assuming (b) is not an issue, though it is suboptimal in general case ] Suboptimal scheduling in spark -- Key: SPARK-2962 URL: https://issues.apache.org/jira/browse/SPARK-2962 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.1.0 Environment: All Reporter: Mridul Muralidharan In findTask, irrespective of 'locality' specified, pendingTasksWithNoPrefs are always scheduled with PROCESS_LOCAL pendingTasksWithNoPrefs contains tasks which currently do not have any alive locations - but which could come in 'later' : particularly relevant when spark app is just coming up and containers are still being added. This causes a large number of non node local tasks to be scheduled incurring significant network transfers in the cluster when running with non trivial datasets. The comment // Look for no-pref tasks after rack-local tasks since they can run anywhere. is misleading in the method code : locality levels start from process_local down to any, and so no prefs get scheduled much before rack. Also note that, currentLocalityIndex is reset to the taskLocality returned by this method - so returning PROCESS_LOCAL as the level will trigger wait times again. (Was relevant before recent change to scheduler, and might be again based on resolution of this issue). Found as part of writing test for SPARK-2931 -- This message was sent by Atlassian JIRA (v6.2#6252) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-2931) getAllowedLocalityLevel() throws ArrayIndexOutOfBoundsException
[ https://issues.apache.org/jira/browse/SPARK-2931?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14091746#comment-14091746 ] Mridul Muralidharan commented on SPARK-2931: [~kayousterhout] this is weird, I remember mentioned this exact same issue in some PR for 1.1 (trying to find which one, though not 1313 iirc); and I think it was supposed to have been addressed. We had observed this issue of currentLocalityLevel running away when we had internally merged the pr. Strange that it was not addressed, speaks volumes of me not following up on my reviews ! getAllowedLocalityLevel() throws ArrayIndexOutOfBoundsException --- Key: SPARK-2931 URL: https://issues.apache.org/jira/browse/SPARK-2931 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.1.0 Environment: Spark EC2, spark-1.1.0-snapshot1, sort-by-key spark-perf benchmark Reporter: Josh Rosen Priority: Blocker Fix For: 1.1.0 When running Spark Perf's sort-by-key benchmark on EC2 with v1.1.0-snapshot, I get the following errors (one per task): {code} 14/08/08 18:54:22 INFO scheduler.TaskSetManager: Starting task 39.0 in stage 0.0 (TID 39, ip-172-31-14-30.us-west-2.compute.internal, PROCESS_LOCAL, 1003 bytes) 14/08/08 18:54:22 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkexecu...@ip-172-31-9-213.us-west-2.compute.internal:58901/user/Executor#1436065036] with ID 0 14/08/08 18:54:22 ERROR actor.OneForOneStrategy: 1 java.lang.ArrayIndexOutOfBoundsException: 1 at org.apache.spark.scheduler.TaskSetManager.getAllowedLocalityLevel(TaskSetManager.scala:475) at org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:409) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$7$$anonfun$apply$2.apply$mcVI$sp(TaskSchedulerImpl.scala:261) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$7.apply(TaskSchedulerImpl.scala:257) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$7.apply(TaskSchedulerImpl.scala:254) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:254) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:254) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:254) at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverActor.makeOffers(CoarseGrainedSchedulerBackend.scala:153) at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverActor$$anonfun$receive$1.applyOrElse(CoarseGrainedSchedulerBackend.scala:103) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) {code} This causes the job to hang. I can deterministically reproduce this by re-running the test, either in isolation or as part of the full performance testing suite. -- This message was sent by Atlassian JIRA (v6.2#6252) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-2931) getAllowedLocalityLevel() throws ArrayIndexOutOfBoundsException
[ https://issues.apache.org/jira/browse/SPARK-2931?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mridul Muralidharan updated SPARK-2931: --- Attachment: test.patch A patch to showcase the exception getAllowedLocalityLevel() throws ArrayIndexOutOfBoundsException --- Key: SPARK-2931 URL: https://issues.apache.org/jira/browse/SPARK-2931 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.1.0 Environment: Spark EC2, spark-1.1.0-snapshot1, sort-by-key spark-perf benchmark Reporter: Josh Rosen Priority: Blocker Fix For: 1.1.0 Attachments: scala-sort-by-key.err, test.patch When running Spark Perf's sort-by-key benchmark on EC2 with v1.1.0-snapshot, I get the following errors (one per task): {code} 14/08/08 18:54:22 INFO scheduler.TaskSetManager: Starting task 39.0 in stage 0.0 (TID 39, ip-172-31-14-30.us-west-2.compute.internal, PROCESS_LOCAL, 1003 bytes) 14/08/08 18:54:22 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkexecu...@ip-172-31-9-213.us-west-2.compute.internal:58901/user/Executor#1436065036] with ID 0 14/08/08 18:54:22 ERROR actor.OneForOneStrategy: 1 java.lang.ArrayIndexOutOfBoundsException: 1 at org.apache.spark.scheduler.TaskSetManager.getAllowedLocalityLevel(TaskSetManager.scala:475) at org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:409) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$7$$anonfun$apply$2.apply$mcVI$sp(TaskSchedulerImpl.scala:261) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$7.apply(TaskSchedulerImpl.scala:257) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$7.apply(TaskSchedulerImpl.scala:254) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:254) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:254) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:254) at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverActor.makeOffers(CoarseGrainedSchedulerBackend.scala:153) at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverActor$$anonfun$receive$1.applyOrElse(CoarseGrainedSchedulerBackend.scala:103) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) {code} This causes the job to hang. I can deterministically reproduce this by re-running the test, either in isolation or as part of the full performance testing suite. -- This message was sent by Atlassian JIRA (v6.2#6252) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-2931) getAllowedLocalityLevel() throws ArrayIndexOutOfBoundsException
[ https://issues.apache.org/jira/browse/SPARK-2931?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14091881#comment-14091881 ] Mridul Muralidharan commented on SPARK-2931: [~joshrosen] [~kayousterhout] Added a patch which deterministically showcases the bug - should be easy to fix it now I hope :-) getAllowedLocalityLevel() throws ArrayIndexOutOfBoundsException --- Key: SPARK-2931 URL: https://issues.apache.org/jira/browse/SPARK-2931 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.1.0 Environment: Spark EC2, spark-1.1.0-snapshot1, sort-by-key spark-perf benchmark Reporter: Josh Rosen Priority: Blocker Fix For: 1.1.0 Attachments: scala-sort-by-key.err, test.patch When running Spark Perf's sort-by-key benchmark on EC2 with v1.1.0-snapshot, I get the following errors (one per task): {code} 14/08/08 18:54:22 INFO scheduler.TaskSetManager: Starting task 39.0 in stage 0.0 (TID 39, ip-172-31-14-30.us-west-2.compute.internal, PROCESS_LOCAL, 1003 bytes) 14/08/08 18:54:22 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkexecu...@ip-172-31-9-213.us-west-2.compute.internal:58901/user/Executor#1436065036] with ID 0 14/08/08 18:54:22 ERROR actor.OneForOneStrategy: 1 java.lang.ArrayIndexOutOfBoundsException: 1 at org.apache.spark.scheduler.TaskSetManager.getAllowedLocalityLevel(TaskSetManager.scala:475) at org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:409) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$7$$anonfun$apply$2.apply$mcVI$sp(TaskSchedulerImpl.scala:261) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$7.apply(TaskSchedulerImpl.scala:257) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$7.apply(TaskSchedulerImpl.scala:254) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:254) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:254) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:254) at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverActor.makeOffers(CoarseGrainedSchedulerBackend.scala:153) at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverActor$$anonfun$receive$1.applyOrElse(CoarseGrainedSchedulerBackend.scala:103) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) {code} This causes the job to hang. I can deterministically reproduce this by re-running the test, either in isolation or as part of the full performance testing suite. -- This message was sent by Atlassian JIRA (v6.2#6252) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
Re: Unit tests in 5 minutes
Issue with supporting this imo is the fact that scala-test uses the same vm for all the tests (surefire plugin supports fork, but scala-test ignores it iirc). So different tests would initialize different spark context, and can potentially step on each others toes. Regards, Mridul On Fri, Aug 8, 2014 at 9:31 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Howdy, Do we think it's both feasible and worthwhile to invest in getting our unit tests to finish in under 5 minutes (or something similarly brief) when run by Jenkins? Unit tests currently seem to take anywhere from 30 min to 2 hours. As people add more tests, I imagine this time will only grow. I think it would be better for both contributors and reviewers if they didn't have to wait so long for test results; PR reviews would be shorter, if nothing else. I don't know how how this is normally done, but maybe it wouldn't be too much work to get a test cycle to feel lighter. Most unit tests are independent and can be run concurrently, right? Would it make sense to build a given patch on many servers at once and send disjoint sets of unit tests to each? I'd be interested in working on something like that if possible (and sensible). Nick - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
[jira] [Commented] (SPARK-2881) Snappy is now default codec - could lead to conflicts since uses /tmp
[ https://issues.apache.org/jira/browse/SPARK-2881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14088018#comment-14088018 ] Mridul Muralidharan commented on SPARK-2881: To add, this will affect spark whenever tmp directory is not overridden via java.io.tmpdir to something ephemeral. So local, yarn client, standalone should be affected by default (unless I missed something in the scripts). I am not very of how mesos runs jobs, so cant comment about that, anyone care to add ? A workaround I can think of is to always set 'org.xerial.snappy.tempdir' to a randomly generated directory under java.io.tmpdir as part of spark startup (only) once : which will cause snappy to use that directory and avoid this issue. Since snappy is the default codec now, I am marking this as a blocker for release Snappy is now default codec - could lead to conflicts since uses /tmp - Key: SPARK-2881 URL: https://issues.apache.org/jira/browse/SPARK-2881 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.1.0 Reporter: Thomas Graves Priority: Blocker I was using spark master branch and I ran into an issue with Snappy since its now the default codec for shuffle. The issue was that someone else had run with snappy and it created /tmp/snappy-*.so but it had restrictive permissions so I was not able to use it or remove it. This caused my spark job to not start. I was running in yarn client mode at the time. Yarn cluster mode shouldn't have this issue since we change the java.io.tmpdir. I assume this would also affect standalone mode. I'm not sure if this is a true blocker but wanted to file it as one at first and let us decide. -- This message was sent by Atlassian JIRA (v6.2#6252) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-2881) Snappy is now default codec - could lead to conflicts since uses /tmp
[ https://issues.apache.org/jira/browse/SPARK-2881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14088018#comment-14088018 ] Mridul Muralidharan edited comment on SPARK-2881 at 8/6/14 6:45 PM: To add, this will affect spark whenever tmp directory is not overridden via java.io.tmpdir to something ephemeral. So local, yarn client, standalone should be affected by default (unless I missed something in the scripts). I am not very of how mesos runs jobs, so cant comment about that, anyone care to add ? A workaround I can think of is to always set 'org.xerial.snappy.tempdir' to a randomly generated directory under java.io.tmpdir as part of spark startup (only) once : which will cause snappy to use that directory and avoid this issue. Since snappy is the default codec now, I am +1 on marking this as a blocker for release was (Author: mridulm80): To add, this will affect spark whenever tmp directory is not overridden via java.io.tmpdir to something ephemeral. So local, yarn client, standalone should be affected by default (unless I missed something in the scripts). I am not very of how mesos runs jobs, so cant comment about that, anyone care to add ? A workaround I can think of is to always set 'org.xerial.snappy.tempdir' to a randomly generated directory under java.io.tmpdir as part of spark startup (only) once : which will cause snappy to use that directory and avoid this issue. Since snappy is the default codec now, I am marking this as a blocker for release Snappy is now default codec - could lead to conflicts since uses /tmp - Key: SPARK-2881 URL: https://issues.apache.org/jira/browse/SPARK-2881 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.1.0 Reporter: Thomas Graves Priority: Blocker I was using spark master branch and I ran into an issue with Snappy since its now the default codec for shuffle. The issue was that someone else had run with snappy and it created /tmp/snappy-*.so but it had restrictive permissions so I was not able to use it or remove it. This caused my spark job to not start. I was running in yarn client mode at the time. Yarn cluster mode shouldn't have this issue since we change the java.io.tmpdir. I assume this would also affect standalone mode. I'm not sure if this is a true blocker but wanted to file it as one at first and let us decide. -- This message was sent by Atlassian JIRA (v6.2#6252) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
Re: -1s on pull requests?
Just came across this mail, thanks for initiating this discussion Kay. To add; another issue which recurs is very rapid commit's: before most contributors have had a chance to even look at the changes proposed. There is not much prior discussion on the jira or pr, and the time between submitting the PR and committing it is 12 hours. Particularly relevant when contributors are not on US timezones and/or colocated; I have raised this a few times before when the commit had other side effects not considered. On flip side we have PR's which have been languishing for weeks with little or no activity from committers side - making the contribution stale; so too long a delay is also definitely not the direction to take either ! Regards, Mridul On Tue, Jul 22, 2014 at 2:14 AM, Kay Ousterhout k...@eecs.berkeley.edu wrote: Hi all, As the number of committers / contributors on Spark has increased, there are cases where pull requests get merged before all the review comments have been addressed. This happens say when one committer points out a problem with the pull request, and another committer doesn't see the earlier comment and merges the PR before the comment has been addressed. This is especially tricky for pull requests with a large number of comments, because it can be difficult to notice early comments describing blocking issues. This also happens when something accidentally gets merged after the tests have started but before tests have passed. Do folks have ideas on how we can handle this issue? Are there other projects that have good ways of handling this? It looks like for Hadoop, people can -1 / +1 on the JIRA. -Kay - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
[jira] [Commented] (SPARK-2685) Update ExternalAppendOnlyMap to avoid buffer.remove()
[ https://issues.apache.org/jira/browse/SPARK-2685?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14074186#comment-14074186 ] Mridul Muralidharan commented on SPARK-2685: We moved to using java.util.LinkedList for this Update ExternalAppendOnlyMap to avoid buffer.remove() - Key: SPARK-2685 URL: https://issues.apache.org/jira/browse/SPARK-2685 Project: Spark Issue Type: Sub-task Components: Spark Core Reporter: Matei Zaharia This shifts the whole right side of the array back, which can be expensive. It would be better to just swap the last element into the position we want to remove at, then decrease the size of the array. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Created] (SPARK-2532) Fix issues with consolidated shuffle
Mridul Muralidharan created SPARK-2532: -- Summary: Fix issues with consolidated shuffle Key: SPARK-2532 URL: https://issues.apache.org/jira/browse/SPARK-2532 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.1.0 Environment: All Reporter: Mridul Muralidharan Assignee: Mridul Muralidharan Priority: Critical Fix For: 1.1.0 Will file PR with changes as soon as merge is done (earlier merge became outdated in 2 weeks unfortunately :) ). Consolidated shuffle is broken in multiple ways in spark : a) Task failure(s) can cause the state to become inconsistent. b) Multiple revert's or combination of close/revert/close can cause the state to be inconsistent. (As part of exception/error handling). c) Some of the api in block writer causes implementation issues - for example: a revert is always followed by close : but the implemention tries to keep them separate, resulting in surface for errors. d) Fetching data from consolidated shuffle files can go badly wrong if the file is being actively written to : it computes length by subtracting next offset from current offset (or length if this is last offset)- the latter fails when fetch is happening in parallel to write. Note, this happens even if there are no task failures of any kind ! This usually results in stream corruption or decompression errors. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-2468) zero-copy shuffle network communication
[ https://issues.apache.org/jira/browse/SPARK-2468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14060543#comment-14060543 ] Mridul Muralidharan commented on SPARK-2468: We map the file content and directly write that to the socket (except when the size is below 8k or so iirc) - are you sure we are copying to user space and back ? zero-copy shuffle network communication --- Key: SPARK-2468 URL: https://issues.apache.org/jira/browse/SPARK-2468 Project: Spark Issue Type: Improvement Components: Shuffle, Spark Core Reporter: Reynold Xin Assignee: Reynold Xin Priority: Critical Right now shuffle send goes through the block manager. This is inefficient because it requires loading a block from disk into a kernel buffer, then into a user space buffer, and then back to a kernel send buffer before it reaches the NIC. It does multiple copies of the data and context switching between kernel/user. It also creates unnecessary buffer in the JVM that increases GC Instead, we should use FileChannel.transferTo, which handles this in the kernel space with zero-copy. See http://www.ibm.com/developerworks/library/j-zerocopy/ One potential solution is to use Netty NIO. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-2468) zero-copy shuffle network communication
[ https://issues.apache.org/jira/browse/SPARK-2468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14060545#comment-14060545 ] Mridul Muralidharan commented on SPARK-2468: Writing mmap'ed buffers are pretty efficient btw - the second fallback in transferTo implementation iirc. zero-copy shuffle network communication --- Key: SPARK-2468 URL: https://issues.apache.org/jira/browse/SPARK-2468 Project: Spark Issue Type: Improvement Components: Shuffle, Spark Core Reporter: Reynold Xin Assignee: Reynold Xin Priority: Critical Right now shuffle send goes through the block manager. This is inefficient because it requires loading a block from disk into a kernel buffer, then into a user space buffer, and then back to a kernel send buffer before it reaches the NIC. It does multiple copies of the data and context switching between kernel/user. It also creates unnecessary buffer in the JVM that increases GC Instead, we should use FileChannel.transferTo, which handles this in the kernel space with zero-copy. See http://www.ibm.com/developerworks/library/j-zerocopy/ One potential solution is to use Netty NIO. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-2468) zero-copy shuffle network communication
[ https://issues.apache.org/jira/browse/SPARK-2468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14061094#comment-14061094 ] Mridul Muralidharan commented on SPARK-2468: Ah, small files - those are indeed a problem. Btw, we do dispose off map'ed blocks as soon as it is done; so we dont need to wait for gc to free them. Also note that the files are closed as soon as opened and mmap'ed - so they do not count towards open file count/ulimit. Agree on 1, 3 and 4 - some of these apply to sendfile too btw : so not avoidable; but it is the best we have right now. Since we use mmap'ed buffers and rarely transfer the same file again, the performance jump might not be the order(s) of magnitude other projects claim - but then even 10% (or whatever) improvement in our case would be substantial ! zero-copy shuffle network communication --- Key: SPARK-2468 URL: https://issues.apache.org/jira/browse/SPARK-2468 Project: Spark Issue Type: Improvement Components: Shuffle, Spark Core Reporter: Reynold Xin Assignee: Reynold Xin Priority: Critical Right now shuffle send goes through the block manager. This is inefficient because it requires loading a block from disk into a kernel buffer, then into a user space buffer, and then back to a kernel send buffer before it reaches the NIC. It does multiple copies of the data and context switching between kernel/user. It also creates unnecessary buffer in the JVM that increases GC Instead, we should use FileChannel.transferTo, which handles this in the kernel space with zero-copy. See http://www.ibm.com/developerworks/library/j-zerocopy/ One potential solution is to use Netty NIO. -- This message was sent by Atlassian JIRA (v6.2#6252)
Re: better compression codecs for shuffle blocks?
We tried with lower block size for lzf, but it barfed all over the place. Snappy was the way to go for our jobs. Regards, Mridul On Mon, Jul 14, 2014 at 12:31 PM, Reynold Xin r...@databricks.com wrote: Hi Spark devs, I was looking into the memory usage of shuffle and one annoying thing is the default compression codec (LZF) is that the implementation we use allocates buffers pretty generously. I did a simple experiment and found that creating 1000 LZFOutputStream allocated 198976424 bytes (~190MB). If we have a shuffle task that uses 10k reducers and 32 threads running currently, the memory used by the lzf stream alone would be ~ 60GB. In comparison, Snappy only allocates ~ 65MB for every 1k SnappyOutputStream. However, Snappy's compression is slightly lower than LZF's. In my experience, it leads to 10 - 20% increase in size. Compression ratio does matter here because we are sending data across the network. In future releases we will likely change the shuffle implementation to open less streams. Until that happens, I'm looking for compression codec implementations that are fast, allocate small buffers, and have decent compression ratio. Does anybody on this list have any suggestions? If not, I will submit a patch for 1.1 that replaces LZF with Snappy for the default compression codec to lower memory usage. allocation data here: https://gist.github.com/rxin/ad7217ea60e3fb36c567
[jira] [Commented] (SPARK-2398) Trouble running Spark 1.0 on Yarn
[ https://issues.apache.org/jira/browse/SPARK-2398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14060113#comment-14060113 ] Mridul Muralidharan commented on SPARK-2398: As discussed in the PR, I am attempting to list the various factors which contribute to overhead. Note, this is not exhaustive (yet) - please add more to this JIRA - so that when we are reasonably sure, we can model the expected overhead based on these factors. These factors are typically off-heap - since anything within heap is budgetted for by Xmx - and enforced by VM : and so should ideally (not practically always, see gc overheads) not exceed the Xmx value 1) 256 KB per socket accepted via ConnectionManager for inter-worker comm (setReceiveBufferSize) Typically, there will be (numExecutor - 1) number of sockets open. 2) 128 KB per socket for writing output to dfs. For reads, this does not seem to be configured - and should be 8k per socket iirc. Typically 1 per executor at a given point in time ? 3) 256k for each akka socket for send/receive buffer. One per worker ? (to talk to master) - so 512kb ? Any other use of akka ? 4) If I am not wrong, netty might allocate multiple spark.akka.frameSize sized direct buffer. There might be a few of these allocated and pooled/reused. I did not go in detail into netty code though. If someone else with more knowhow can clarify, that would be great ! Default size of 10mb for spark.akka.frameSize 5) The default size of the assembled spark jar is about 12x mb (and changing) - though not all classes get loaded, the overhead would be some function of this. The actual footprint would be higher than the on-disk size. IIRC this is outside of the heap - [~sowen], any comments on this ? I have not looked into these in like 10 years now ! 6) Per thread (Xss) overhead of 1mb (for 64bit vm). Last I recall, we have about 220 odd threads - not sure if this was at the master or on the workers. Ofcourse, this is dependent on the various threadpools we use (io, computation, etc), akka and netty config, etc. 7) Disk read overhead. Thanks for [~pwendell]'s fix, atleast for small files, the overhead is not too high - since we do not mmap files but directly read them. But for anything larger than 8kb (default), we use memory mapped buffers. The actual overhead depends on the number of files opened for read via DiskStore - and the entire file contents get mmap'ed into virt mem. Note that there is some non-virt-mem overhead also at native level for these buffers. The actual number of files opened should be carefully tracked to understand the effect of this on spark overhead : since this aspect is changing a lot off late. Impact is on shuffle, disk persisted rdd, among others. The actual value would be application dependent (how large the data is !) 8) The overhead introduced by VM not being able to reclaim memory completely (the cost of moving data vs amount of space reclaimed). Ideally, this should be low - but would be dependent on the heap space, collector used, among other things. I am not very knowledgable of the recent advances in gc collectors, so I hesitate to put a number to this. I am sure this is not an exhaustive list, please do add to this. In our case specifically, and [~tgraves] could add more, the number of containers can be high (300+ is easily possible), memory per container is modest (8gig usually). To add details of observed overhead patterns (from the PR discussion) - a) I have had inhouse GBDT impl run without customizing overhead (so default of 384 mb) with 12gb container and 22 nodes on reasonably large dataset. b) I have had to customize overhead to 1.7gb for collaborative filtering with 8gb container and 300 nodes (on a fairly large dataset). c) I have had to minimally customize overhead to do inhouse QR factorization of a 50k x 50k distributed dense matrix on 45 nodes at 12 gb each (this was incorrectly specified in the PR discussion). Trouble running Spark 1.0 on Yarn -- Key: SPARK-2398 URL: https://issues.apache.org/jira/browse/SPARK-2398 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.0.0 Reporter: Nishkam Ravi Trouble running workloads in Spark-on-YARN cluster mode for Spark 1.0. For example: SparkPageRank when run in standalone mode goes through without any errors (tested for up to 30GB input dataset on a 6-node cluster). Also runs fine for a 1GB dataset in yarn cluster mode. Starts to choke (in yarn cluster mode) as the input data size is increased. Confirmed for 16GB input dataset. The same workload runs fine with Spark 0.9 in both standalone and yarn cluster mode (for up to 30 GB input dataset on a 6-node cluster). Commandline used: (/opt/cloudera/parcels/CDH/lib/spark/bin/spark-submit
Re: [GitHub] spark pull request: Modify default YARN memory_overhead-- from an ...
You are lucky :-) for some of our jobs, in a 8gb container, overhead is 1.8gb ! On 13-Jul-2014 2:40 pm, nishkamravi2 g...@git.apache.org wrote: Github user nishkamravi2 commented on the pull request: https://github.com/apache/spark/pull/1391#issuecomment-48835560 Sean, the memory_overhead is fairly substantial. More than 2GB for a 30GB executor. Less than 400MB for a 2GB executor. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
Unresponsive to PR/jira changes
Hi, I noticed today that gmail has been marking most of the mails from spark github/jira I was receiving to spam folder; and I was assuming it was lull in activity due to spark summit for past few weeks ! In case I have commented on specific PR/JIRA issues and not followed up, apologies for the same - please do reach out in case it is still pending something from my end. Regards, Mridul
Re: on shark, is tachyon less efficient than memory_only cache strategy ?
You are ignoring serde costs :-) - Mridul On Tue, Jul 8, 2014 at 8:48 PM, Aaron Davidson ilike...@gmail.com wrote: Tachyon should only be marginally less performant than memory_only, because we mmap the data from Tachyon's ramdisk. We do not have to, say, transfer the data over a pipe from Tachyon; we can directly read from the buffers in the same way that Shark reads from its in-memory columnar format. On Tue, Jul 8, 2014 at 1:18 AM, qingyang li liqingyang1...@gmail.com wrote: hi, when i create a table, i can point the cache strategy using shark.cache, i think shark.cache=memory_only means data are managed by spark, and data are in the same jvm with excutor; while shark.cache=tachyon means data are managed by tachyon which is off heap, and data are not in the same jvm with excutor, so spark will load data from tachyon for each query sql , so, is tachyon less efficient than memory_only cache strategy ? if yes, can we let spark load all data once from tachyon for all sql query if i want to use tachyon cache strategy since tachyon is more HA than memory_only ?
[jira] [Commented] (SPARK-2390) Files in staging directory cannot be deleted and wastes the space of HDFS
[ https://issues.apache.org/jira/browse/SPARK-2390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14054162#comment-14054162 ] Mridul Muralidharan commented on SPARK-2390: Here, and a bunch of other places, spark currently closes the Filesystem instance : this is incorrect, and should not be done. The fix would be to remove the fs.close; not force creation of new instances. Files in staging directory cannot be deleted and wastes the space of HDFS - Key: SPARK-2390 URL: https://issues.apache.org/jira/browse/SPARK-2390 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.0.0, 1.0.1 Reporter: Kousuke Saruta When running jobs with YARN Cluster mode and using HistoryServer, the files in the Staging Directory cannot be deleted. HistoryServer uses directory where event log is written, and the directory is represented as a instance of o.a.h.f.FileSystem created by using FileSystem.get. {code:title=FileLogger.scala} private val fileSystem = Utils.getHadoopFileSystem(new URI(logDir)) {code} {code:title=utils.getHadoopFileSystem} def getHadoopFileSystem(path: URI): FileSystem = { FileSystem.get(path, SparkHadoopUtil.get.newConfiguration()) } {code} On the other hand, ApplicationMaster has a instance named fs, which also created by using FileSystem.get. {code:title=ApplicationMaster} private val fs = FileSystem.get(yarnConf) {code} FileSystem.get returns cached same instance when URI passed to the method represents same file system and the method is called by same user. Because of the behavior, when the directory for event log is on HDFS, fs of ApplicationMaster and fileSystem of FileLogger is same instance. When shutting down ApplicationMaster, fileSystem.close is called in FileLogger#stop, which is invoked by SparkContext#stop indirectly. {code:title=FileLogger.stop} def stop() { hadoopDataStream.foreach(_.close()) writer.foreach(_.close()) fileSystem.close() } {code} And ApplicationMaster#cleanupStagingDir also called by JVM shutdown hook. In this method, fs.delete(stagingDirPath) is invoked. Because fs.delete in ApplicationMaster is called after fileSystem.close in FileLogger, fs.delete fails and results not deleting files in the staging directory. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-2277) Make TaskScheduler track whether there's host on a rack
[ https://issues.apache.org/jira/browse/SPARK-2277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14052275#comment-14052275 ] Mridul Muralidharan commented on SPARK-2277: Hmm, good point - that PR does change the scheduler expectations in a lot of ways which were not all anticipated. Let me go through the current PR; thanks for the bug ! Make TaskScheduler track whether there's host on a rack --- Key: SPARK-2277 URL: https://issues.apache.org/jira/browse/SPARK-2277 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 1.0.0 Reporter: Rui Li When TaskSetManager adds a pending task, it checks whether the tasks's preferred location is available. Regarding RACK_LOCAL task, we consider the preferred rack available if such a rack is defined for the preferred host. This is incorrect as there may be no alive hosts on that rack at all. Therefore, TaskScheduler should track the hosts on each rack, and provides an API for TaskSetManager to check if there's host alive on a specific rack. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-2017) web ui stage page becomes unresponsive when the number of tasks is large
[ https://issues.apache.org/jira/browse/SPARK-2017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14052289#comment-14052289 ] Mridul Muralidharan commented on SPARK-2017: With aggregated metrics, we loose the ability to check for gc time (which is actually what I use that UI for, other than to dig up exceptions on failed tasks). web ui stage page becomes unresponsive when the number of tasks is large Key: SPARK-2017 URL: https://issues.apache.org/jira/browse/SPARK-2017 Project: Spark Issue Type: Sub-task Reporter: Reynold Xin Labels: starter {code} sc.parallelize(1 to 100, 100).count() {code} The above code creates one million tasks to be executed. The stage detail web ui page takes forever to load (if it ever completes). There are again a few different alternatives: 0. Limit the number of tasks we show. 1. Pagination 2. By default only show the aggregate metrics and failed tasks, and hide the successful ones. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-2017) web ui stage page becomes unresponsive when the number of tasks is large
[ https://issues.apache.org/jira/browse/SPARK-2017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14052679#comment-14052679 ] Mridul Muralidharan commented on SPARK-2017: Sounds great, ability to get to currently running tasks (to check current state), ability to get to task failures (to debug usually), some aggregate stats (gc, stats per executor, etc) and having some way to get to the full details (which is what is seen currently) in an advanced or full view. Anything else would be a bonus :-) web ui stage page becomes unresponsive when the number of tasks is large Key: SPARK-2017 URL: https://issues.apache.org/jira/browse/SPARK-2017 Project: Spark Issue Type: Sub-task Reporter: Reynold Xin Labels: starter {code} sc.parallelize(1 to 100, 100).count() {code} The above code creates one million tasks to be executed. The stage detail web ui page takes forever to load (if it ever completes). There are again a few different alternatives: 0. Limit the number of tasks we show. 1. Pagination 2. By default only show the aggregate metrics and failed tasks, and hide the successful ones. -- This message was sent by Atlassian JIRA (v6.2#6252)
Re: Eliminate copy while sending data : any Akka experts here ?
In our clusters, number of containers we can get is high but memory per container is low : which is why avg_nodes_not_hosting data is rarely zero for ML tasks :-) To update - to unblock our current implementation efforts, we went with broadcast - since it is intutively easier and minimal change; and compress the array as bytes in TaskResult. This is then stored in disk backed maps - to remove memory pressure on master and workers (else MapOutputTracker becomes a memory hog). But I agree, compressed bitmap to represent 'large' blocks (anything larger that maxBytesInFlight actually) and probably existing to track non zero should be fine (we should not really track zero output for reducer - just waste of space). Regards, Mridul On Fri, Jul 4, 2014 at 3:43 AM, Reynold Xin r...@databricks.com wrote: Note that in my original proposal, I was suggesting we could track whether block size = 0 using a compressed bitmap. That way we can still avoid requests for zero-sized blocks. On Thu, Jul 3, 2014 at 3:12 PM, Reynold Xin r...@databricks.com wrote: Yes, that number is likely == 0 in any real workload ... On Thu, Jul 3, 2014 at 8:01 AM, Mridul Muralidharan mri...@gmail.com wrote: On Thu, Jul 3, 2014 at 11:32 AM, Reynold Xin r...@databricks.com wrote: On Wed, Jul 2, 2014 at 3:44 AM, Mridul Muralidharan mri...@gmail.com wrote: The other thing we do need is the location of blocks. This is actually just O(n) because we just need to know where the map was run. For well partitioned data, wont this not involve a lot of unwanted requests to nodes which are not hosting data for a reducer (and lack of ability to throttle). Was that a question? (I'm guessing it is). What do you mean exactly? I was not sure if I understood the proposal correctly - hence the query : if I understood it right - the number of wasted requests goes up by num_reducers * avg_nodes_not_hosting data. Ofcourse, if avg_nodes_not_hosting data == 0, then we are fine ! Regards, Mridul
[jira] [Created] (SPARK-2353) ArrayIndexOutOfBoundsException in scheduler
Mridul Muralidharan created SPARK-2353: -- Summary: ArrayIndexOutOfBoundsException in scheduler Key: SPARK-2353 URL: https://issues.apache.org/jira/browse/SPARK-2353 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.1.0 Reporter: Mridul Muralidharan Priority: Blocker I suspect the recent changes from SPARK-1937 to compute valid locality levels (and ignoring ones which are not applicable) has resulted in this issue. Specifically, some of the code using currentLocalityIndex (and lastLaunchTime actually) seems to be assuming a) constant population of locality levels. b) probably also immutablility/repeatibility of locality levels These do not hold any longer. I do not have the exact values for which this failure was observed (since this is from the logs of a failed job) - but the code path is highly suspect. Also note that the line numbers/classes might not exactly match master since we are in the middle of a merge. But the issue should hopefully be evident. java.lang.ArrayIndexOutOfBoundsException: 2 at org.apache.spark.scheduler.TaskSetManager.getAllowedLocalityLevel(TaskSetManager.scala:439) at org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:388) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$5$$anonfun$apply$2.apply$mcVI$sp(TaskSchedulerImpl.scala:248) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$5.apply(TaskSchedulerImpl.scala:244) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$5.apply(TaskSchedulerImpl.scala:241) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:241) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:241) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:241) at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverActor.makeOffers(CoarseGrainedSchedulerBackend.scala:133) at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverActor$$anonfun$receive$1.applyOrElse(CoarseGrainedSchedulerBackend.scala:86) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-2277) Make TaskScheduler track whether there's host on a rack
[ https://issues.apache.org/jira/browse/SPARK-2277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14051575#comment-14051575 ] Mridul Muralidharan commented on SPARK-2277: I have not rechecked that the code, but the way it was originally written by me was : a) Task preference is decoupled from availability of the node. For example, we need not have an executor on a host for which a block has host preference (example dfs blocks on a shared cluster) Also note that a block might have one or more preferred location. b) We lookup the rack for the preferred location to get preferred rack. As with (a), there need not be an executor on that rack. This is just the rack preference. c) At schedule time, for an executor, we lookup the host/rack of the executors location - and decide appropriately based on that. In this context, I think your requirement is already handled. Even if we dont have any hosts alive on a rack, those tasks would still be mentioned with rack local preference in task set manager. When an executor comes in (existing or new), we check that executors rack with task preference - and it would now be marked rack local. Make TaskScheduler track whether there's host on a rack --- Key: SPARK-2277 URL: https://issues.apache.org/jira/browse/SPARK-2277 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 1.0.0 Reporter: Rui Li When TaskSetManager adds a pending task, it checks whether the tasks's preferred location is available. Regarding RACK_LOCAL task, we consider the preferred rack available if such a rack is defined for the preferred host. This is incorrect as there may be no alive hosts on that rack at all. Therefore, TaskScheduler should track the hosts on each rack, and provides an API for TaskSetManager to check if there's host alive on a specific rack. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (SPARK-2353) ArrayIndexOutOfBoundsException in scheduler
[ https://issues.apache.org/jira/browse/SPARK-2353?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mridul Muralidharan updated SPARK-2353: --- Description: I suspect the recent changes from SPARK-1937 to compute valid locality levels (and ignoring ones which are not applicable) has resulted in this issue. Specifically, some of the code using currentLocalityIndex (and lastLaunchTime actually) seems to be assuming a) constant population of locality levels. b) probably also immutablility/repeatibility of locality levels These do not hold any longer. I do not have the exact values for which this failure was observed (since this is from the logs of a failed job) - but the code path is suspect. Also note that the line numbers/classes might not exactly match master since we are in the middle of a merge. But the issue should hopefully be evident. java.lang.ArrayIndexOutOfBoundsException: 2 at org.apache.spark.scheduler.TaskSetManager.getAllowedLocalityLevel(TaskSetManager.scala:439) at org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:388) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$5$$anonfun$apply$2.apply$mcVI$sp(TaskSchedulerImpl.scala:248) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$5.apply(TaskSchedulerImpl.scala:244) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$5.apply(TaskSchedulerImpl.scala:241) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:241) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:241) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:241) at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverActor.makeOffers(CoarseGrainedSchedulerBackend.scala:133) at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverActor$$anonfun$receive$1.applyOrElse(CoarseGrainedSchedulerBackend.scala:86) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Unfortunately, we do not have the bandwidth to tackle this issue - would be great if someone could take a look at it ! Thanks. was: I suspect the recent changes from SPARK-1937 to compute valid locality levels (and ignoring ones which are not applicable) has resulted in this issue. Specifically, some of the code using currentLocalityIndex (and lastLaunchTime actually) seems to be assuming a) constant population of locality levels. b) probably also immutablility/repeatibility of locality levels These do not hold any longer. I do not have the exact values for which this failure was observed (since this is from the logs of a failed job) - but the code path is highly suspect. Also note that the line numbers/classes might not exactly match master since we are in the middle of a merge. But the issue should hopefully be evident. java.lang.ArrayIndexOutOfBoundsException: 2 at org.apache.spark.scheduler.TaskSetManager.getAllowedLocalityLevel(TaskSetManager.scala:439) at org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:388) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$5$$anonfun$apply$2.apply$mcVI$sp(TaskSchedulerImpl.scala:248) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$5.apply(TaskSchedulerImpl.scala:244) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$5.apply(TaskSchedulerImpl.scala:241
Re: Eliminate copy while sending data : any Akka experts here ?
On Thu, Jul 3, 2014 at 11:32 AM, Reynold Xin r...@databricks.com wrote: On Wed, Jul 2, 2014 at 3:44 AM, Mridul Muralidharan mri...@gmail.com wrote: The other thing we do need is the location of blocks. This is actually just O(n) because we just need to know where the map was run. For well partitioned data, wont this not involve a lot of unwanted requests to nodes which are not hosting data for a reducer (and lack of ability to throttle). Was that a question? (I'm guessing it is). What do you mean exactly? I was not sure if I understood the proposal correctly - hence the query : if I understood it right - the number of wasted requests goes up by num_reducers * avg_nodes_not_hosting data. Ofcourse, if avg_nodes_not_hosting data == 0, then we are fine ! Regards, Mridul
[jira] [Commented] (SPARK-2277) Make TaskScheduler track whether there's host on a rack
[ https://issues.apache.org/jira/browse/SPARK-2277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14050886#comment-14050886 ] Mridul Muralidharan commented on SPARK-2277: I am not sure I follow this requirement. For preferred locations, we populate their corresponding racks (if available) as preferred rack. For available executors hosts, we lookup the rack they belong to - and then see if that rack is preferred or not. This, ofcourse, assumes a host is only on a single rack. What exactly is the behavior you are expecting from scheduler ? Make TaskScheduler track whether there's host on a rack --- Key: SPARK-2277 URL: https://issues.apache.org/jira/browse/SPARK-2277 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 1.0.0 Reporter: Rui Li When TaskSetManager adds a pending task, it checks whether the tasks's preferred location is available. Regarding RACK_LOCAL task, we consider the preferred rack available if such a rack is defined for the preferred host. This is incorrect as there may be no alive hosts on that rack at all. Therefore, TaskScheduler should track the hosts on each rack, and provides an API for TaskSetManager to check if there's host alive on a specific rack. -- This message was sent by Atlassian JIRA (v6.2#6252)
Re: Eliminate copy while sending data : any Akka experts here ?
Hi Patrick, Please see inline. Regards, Mridul On Wed, Jul 2, 2014 at 10:52 AM, Patrick Wendell pwend...@gmail.com wrote: b) Instead of pulling this information, push it to executors as part of task submission. (What Patrick mentioned ?) (1) a.1 from above is still an issue for this. I don't understand problem a.1 is. In this case, we don't need to do caching, right? To rephrase in this context, attempting to cache wont help since it is reducer specific and benefits are minimal (other than for reexecution for failures and speculative tasks). (2) Serialized task size is also a concern : we have already seen users hitting akka limits for task size - this will be an additional vector which might exacerbate it. This would add only a small, constant amount of data to the task. It's strictly better than before. Before if the map output status array was size M x R, we send a single akka message to every node of size M x R... this basically scales quadratically with the size of the RDD. The new approach is constant... it's much better. And the total amount of data send over the wire is likely much less. It would be a function of the number of mappers - and an overhead for each task. Regards, Mridul - Patrick
Re: Eliminate copy while sending data : any Akka experts here ?
Hi Reynold, Please see inline. Regards, Mridul On Wed, Jul 2, 2014 at 10:57 AM, Reynold Xin r...@databricks.com wrote: I was actually talking to tgraves today at the summit about this. Based on my understanding, the sizes we track and send (which is unfortunately O(M*R) regardless of how we change the implementation -- whether we send via task or send via MapOutputTracker) is only used to compute maxBytesInFlight so we can throttle the fetching speed to not result in oom. Perhaps for very large shuffles, we don't need to send the bytes for each block, and we can send whether they are zero or not (which can be tracked via a compressed bitmap that can be tiny). You are right, currently for large blocks, we just need to know where the block exists. I was not sure if there was any possible future extension on that - for this reason, in order to preserve functionality, we moved to using Short from Byte for MapOutputTracker.compressedSize (to ensure large sizes can be represented with 0.7% error). Within a MapStatus, we moved to holding compressed data to save on space within master/workers (particularly for large number of reducers). If we do not anticipate any other reason for size, we can move back to using Byte instead of Short to compress size (which will reduce required space by some factor less than 2) : since error in computed size for blocks larger than maxBytesInFlight does not really matter : we will split them into different FetchRequest's. The other thing we do need is the location of blocks. This is actually just O(n) because we just need to know where the map was run. For well partitioned data, wont this not involve a lot of unwanted requests to nodes which are not hosting data for a reducer (and lack of ability to throttle). Regards, Mridul On Tue, Jul 1, 2014 at 2:51 AM, Mridul Muralidharan mri...@gmail.com wrote: We had considered both approaches (if I understood the suggestions right) : a) Pulling only map output states for tasks which run on the reducer by modifying the Actor. (Probably along lines of what Aaron described ?) The performance implication of this was bad : 1) We cant cache serialized result anymore, (caching it makes no sense rather). 2) The number requests to master will go from num_executors to num_reducers - the latter can be orders of magnitude higher than former. b) Instead of pulling this information, push it to executors as part of task submission. (What Patrick mentioned ?) (1) a.1 from above is still an issue for this. (2) Serialized task size is also a concern : we have already seen users hitting akka limits for task size - this will be an additional vector which might exacerbate it. Our jobs are not hitting this yet though ! I was hoping there might be something in akka itself to alleviate this - but if not, we can solve it within context of spark. Currently, we have worked around it by using broadcast variable when serialized size is above some threshold - so that our immediate concerns are unblocked :-) But a better solution should be greatly welcomed ! Maybe we can unify it with large serialized task as well ... Btw, I am not sure what the higher cost of BlockManager referred to is Aaron - do you mean the cost of persisting the serialized map outputs to disk ? Regards, Mridul On Tue, Jul 1, 2014 at 1:36 PM, Patrick Wendell pwend...@gmail.com wrote: Yeah I created a JIRA a while back to piggy-back the map status info on top of the task (I honestly think it will be a small change). There isn't a good reason to broadcast the entire array and it can be an issue during large shuffles. - Patrick On Mon, Jun 30, 2014 at 7:58 PM, Aaron Davidson ilike...@gmail.com wrote: I don't know of any way to avoid Akka doing a copy, but I would like to mention that it's on the priority list to piggy-back only the map statuses relevant to a particular map task on the task itself, thus reducing the total amount of data sent over the wire by a factor of N for N physical machines in your cluster. Ideally we would also avoid Akka entirely when sending the tasks, as these can get somewhat large and Akka doesn't work well with large messages. Do note that your solution of using broadcast to send the map tasks is very similar to how the executor returns the result of a task when it's too big for akka. We were thinking of refactoring this too, as using the block manager has much higher latency than a direct TCP send. On Mon, Jun 30, 2014 at 12:13 PM, Mridul Muralidharan mri...@gmail.com wrote: Our current hack is to use Broadcast variables when serialized statuses are above some (configurable) size : and have the workers directly pull them from master. This is a workaround : so would be great if there was a better/principled solution. Please note that the responses are going to different workers requesting for the output statuses for shuffle (after
Re: Eliminate copy while sending data : any Akka experts here ?
We had considered both approaches (if I understood the suggestions right) : a) Pulling only map output states for tasks which run on the reducer by modifying the Actor. (Probably along lines of what Aaron described ?) The performance implication of this was bad : 1) We cant cache serialized result anymore, (caching it makes no sense rather). 2) The number requests to master will go from num_executors to num_reducers - the latter can be orders of magnitude higher than former. b) Instead of pulling this information, push it to executors as part of task submission. (What Patrick mentioned ?) (1) a.1 from above is still an issue for this. (2) Serialized task size is also a concern : we have already seen users hitting akka limits for task size - this will be an additional vector which might exacerbate it. Our jobs are not hitting this yet though ! I was hoping there might be something in akka itself to alleviate this - but if not, we can solve it within context of spark. Currently, we have worked around it by using broadcast variable when serialized size is above some threshold - so that our immediate concerns are unblocked :-) But a better solution should be greatly welcomed ! Maybe we can unify it with large serialized task as well ... Btw, I am not sure what the higher cost of BlockManager referred to is Aaron - do you mean the cost of persisting the serialized map outputs to disk ? Regards, Mridul On Tue, Jul 1, 2014 at 1:36 PM, Patrick Wendell pwend...@gmail.com wrote: Yeah I created a JIRA a while back to piggy-back the map status info on top of the task (I honestly think it will be a small change). There isn't a good reason to broadcast the entire array and it can be an issue during large shuffles. - Patrick On Mon, Jun 30, 2014 at 7:58 PM, Aaron Davidson ilike...@gmail.com wrote: I don't know of any way to avoid Akka doing a copy, but I would like to mention that it's on the priority list to piggy-back only the map statuses relevant to a particular map task on the task itself, thus reducing the total amount of data sent over the wire by a factor of N for N physical machines in your cluster. Ideally we would also avoid Akka entirely when sending the tasks, as these can get somewhat large and Akka doesn't work well with large messages. Do note that your solution of using broadcast to send the map tasks is very similar to how the executor returns the result of a task when it's too big for akka. We were thinking of refactoring this too, as using the block manager has much higher latency than a direct TCP send. On Mon, Jun 30, 2014 at 12:13 PM, Mridul Muralidharan mri...@gmail.com wrote: Our current hack is to use Broadcast variables when serialized statuses are above some (configurable) size : and have the workers directly pull them from master. This is a workaround : so would be great if there was a better/principled solution. Please note that the responses are going to different workers requesting for the output statuses for shuffle (after map) - so not sure if back pressure buffers, etc would help. Regards, Mridul On Mon, Jun 30, 2014 at 11:07 PM, Mridul Muralidharan mri...@gmail.com wrote: Hi, While sending map output tracker result, the same serialized byte array is sent multiple times - but the akka implementation copies it to a private byte array within ByteString for each send. Caching a ByteString instead of Array[Byte] did not help, since akka does not support special casing ByteString : serializes the ByteString, and copies the result out to an array before creating ByteString out of it (in Array[Byte] serializing is thankfully simply returning same array - so one copy only). Given the need to send immutable data large number of times, is there any way to do it in akka without copying internally in akka ? To see how expensive it is, for 200 nodes withi large number of mappers and reducers, the status becomes something like 30 mb for us - and pulling this about 200 to 300 times results in OOM due to the large number of copies sent out. Thanks, Mridul
[jira] [Commented] (SPARK-2294) TaskSchedulerImpl and TaskSetManager do not properly prioritize which tasks get assigned to an executor
[ https://issues.apache.org/jira/browse/SPARK-2294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14045433#comment-14045433 ] Mridul Muralidharan commented on SPARK-2294: I agree; We should bump no locality pref and speculative tasks to NODE_LOCAL level after NODE_LOCAL tasks have been scheduled (if available), and not check for them at PROCESS_LOCAL max locality. So they get scheduled before RACK_LOCAL but after NODE_LOCAL. This is an artifact of the design when there was no PROCESS_LOCAL and NODE_LOCAL was the best schedule possible (without explicitly having these level : we had node and any). TaskSchedulerImpl and TaskSetManager do not properly prioritize which tasks get assigned to an executor --- Key: SPARK-2294 URL: https://issues.apache.org/jira/browse/SPARK-2294 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.0.0, 1.0.1 Reporter: Kay Ousterhout If an executor E is free, a task may be speculatively assigned to E when there are other tasks in the job that have not been launched (at all) yet. Similarly, a task without any locality preferences may be assigned to E when there was another NODE_LOCAL task that could have been scheduled. This happens because TaskSchedulerImpl calls TaskSetManager.resourceOffer (which in turn calls TaskSetManager.findTask) with increasing locality levels, beginning with PROCESS_LOCAL, followed by NODE_LOCAL, and so on until the highest currently allowed level. Now, supposed NODE_LOCAL is the highest currently allowed locality level. The first time findTask is called, it will be called with max level PROCESS_LOCAL; if it cannot find any PROCESS_LOCAL tasks, it will try to schedule tasks with no locality preferences or speculative tasks. As a result, speculative tasks or tasks with no preferences may be scheduled instead of NODE_LOCAL tasks. cc [~matei] -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-2268) Utils.createTempDir() creates race with HDFS at shutdown
[ https://issues.apache.org/jira/browse/SPARK-2268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14043088#comment-14043088 ] Mridul Muralidharan commented on SPARK-2268: That is not because of this hook. There are a bunch of places in spark where filesystem objects are (incorrectly I should add) getting closed : some within shutdown hooks (check in stop method in various services in spark) and others elsewhere (like checkpointing code). I have fixed a bunch of these as part of some other work ... should come in a PR soon. Utils.createTempDir() creates race with HDFS at shutdown Key: SPARK-2268 URL: https://issues.apache.org/jira/browse/SPARK-2268 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.0.0 Reporter: Marcelo Vanzin Utils.createTempDir() has this code: {code} // Add a shutdown hook to delete the temp dir when the JVM exits Runtime.getRuntime.addShutdownHook(new Thread(delete Spark temp dir + dir) { override def run() { // Attempt to delete if some patch which is parent of this is not already registered. if (! hasRootAsShutdownDeleteDir(dir)) Utils.deleteRecursively(dir) } }) {code} This creates a race with the shutdown hooks registered by HDFS, since the order of execution is undefined; if the HDFS hooks run first, you'll get exceptions about the file system being closed. Instead, this should use Hadoop's ShutdownHookManager with a proper priority, so that it runs before the HDFS hooks. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-2268) Utils.createTempDir() creates race with HDFS at shutdown
[ https://issues.apache.org/jira/browse/SPARK-2268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14043071#comment-14043071 ] Mridul Muralidharan commented on SPARK-2268: Setting priority for shutdown hooks does not have too much impact given the state of the VM. Note that this hook is trying to delete local directories - not dfs directories. Utils.createTempDir() creates race with HDFS at shutdown Key: SPARK-2268 URL: https://issues.apache.org/jira/browse/SPARK-2268 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.0.0 Reporter: Marcelo Vanzin Utils.createTempDir() has this code: {code} // Add a shutdown hook to delete the temp dir when the JVM exits Runtime.getRuntime.addShutdownHook(new Thread(delete Spark temp dir + dir) { override def run() { // Attempt to delete if some patch which is parent of this is not already registered. if (! hasRootAsShutdownDeleteDir(dir)) Utils.deleteRecursively(dir) } }) {code} This creates a race with the shutdown hooks registered by HDFS, since the order of execution is undefined; if the HDFS hooks run first, you'll get exceptions about the file system being closed. Instead, this should use Hadoop's ShutdownHookManager with a proper priority, so that it runs before the HDFS hooks. -- This message was sent by Atlassian JIRA (v6.2#6252)
Re: [jira] [Created] (SPARK-1867) Spark Documentation Error causes java.lang.IllegalStateException: unread block data
There are a few interacting issues here - and unfortunately I dont recall all of it (since this was fixed a few months back). From memory though : a) With shuffle consolidation, data sent to remote node incorrectly includes data from partially constructed blocks - not just the request blocks. Actually, with shuffle consolidation (with and without failures) quite a few things broke. b) There might have been a few other bugs in DiskBlockObjectWriter too. c) We also suspected buffers overlapping when using cached kryo serializer (though never proved this, just disabled caching across board for now : and always create new instance). The way we debug'ed it is by introducing an Input/Output stream which introduced checksum into the data stream and validating that at each side for compression, serialization, etc. Apologies for being non specific ... I really dont have the details right now, and our internal branch is in flux due to merge effort to port our local changes to master. Hopefully we will be able to submit PR's as soon as this is done and testcases are added to validate. Regards, Mridul On Tue, Jun 24, 2014 at 10:21 AM, Reynold Xin r...@databricks.com wrote: Mridul, Can you comment a little bit more on this issue? We are running into the same stack trace but not sure whether it is just different Spark versions on each cluster (doesn't seem likely) or a bug in Spark. Thanks. On Sat, May 17, 2014 at 4:41 AM, Mridul Muralidharan mri...@gmail.com wrote: I suspect this is an issue we have fixed internally here as part of a larger change - the issue we fixed was not a config issue but bugs in spark. Unfortunately we plan to contribute this as part of 1.1 Regards, Mridul On 17-May-2014 4:09 pm, sam (JIRA) j...@apache.org wrote: sam created SPARK-1867: -- Summary: Spark Documentation Error causes java.lang.IllegalStateException: unread block data Key: SPARK-1867 URL: https://issues.apache.org/jira/browse/SPARK-1867 Project: Spark Issue Type: Bug Reporter: sam I've employed two System Administrators on a contract basis (for quite a bit of money), and both contractors have independently hit the following exception. What we are doing is: 1. Installing Spark 0.9.1 according to the documentation on the website, along with CDH4 (and another cluster with CDH5) distros of hadoop/hdfs. 2. Building a fat jar with a Spark app with sbt then trying to run it on the cluster I've also included code snippets, and sbt deps at the bottom. When I've Googled this, there seems to be two somewhat vague responses: a) Mismatching spark versions on nodes/user code b) Need to add more jars to the SparkConf Now I know that (b) is not the problem having successfully run the same code on other clusters while only including one jar (it's a fat jar). But I have no idea how to check for (a) - it appears Spark doesn't have any version checks or anything - it would be nice if it checked versions and threw a mismatching version exception: you have user code using version X and node Y has version Z. I would be very grateful for advice on this. The exception: Exception in thread main org.apache.spark.SparkException: Job aborted: Task 0.0:1 failed 32 times (most recent failure: Exception failure: java.lang.IllegalStateException: unread block data) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604) at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386
[jira] [Commented] (SPARK-704) ConnectionManager sometimes cannot detect loss of sending connections
[ https://issues.apache.org/jira/browse/SPARK-704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14039742#comment-14039742 ] Mridul Muralidharan commented on SPARK-704: --- If remote node goes down, SendingConnection would be notified since it is also registered for read events (to handle precisely this case actually). ReceivingConnection would anyway be notified since it is waiting on reads on that socket. This, ofcourse, assumes that local node detects remote node failure at tcp layer. Problems come in when ConnectionManager sometimes cannot detect loss of sending connections - Key: SPARK-704 URL: https://issues.apache.org/jira/browse/SPARK-704 Project: Spark Issue Type: Bug Reporter: Charles Reiss Assignee: Henry Saputra ConnectionManager currently does not detect when SendingConnections disconnect except if it is trying to send through them. As a result, a node failure just after a connection is initiated but before any acknowledgement messages can be sent may result in a hang. ConnectionManager has code intended to detect this case by detecting the failure of a corresponding ReceivingConnection, but this code assumes that the remote host:port of the ReceivingConnection is the same as the ConnectionManagerId, which is almost never true. Additionally, there does not appear to be any reason to assume a corresponding ReceivingConnection will exist. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Comment Edited] (SPARK-704) ConnectionManager sometimes cannot detect loss of sending connections
[ https://issues.apache.org/jira/browse/SPARK-704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14039742#comment-14039742 ] Mridul Muralidharan edited comment on SPARK-704 at 6/21/14 9:10 AM: If remote node goes down, SendingConnection would be notified since it is also registered for read events (to handle precisely this case actually). ReceivingConnection would be notified since it is waiting on reads on that socket. This, ofcourse, assumes that local node detects remote node failure at tcp layer. Problems come in when this is not detected due to no activity on the socket (at app and socket level - keepalive timeout, etc). Usually this is detected via application level ping/keepalive messages : not sure if we want to introduce that into spark ... was (Author: mridulm80): If remote node goes down, SendingConnection would be notified since it is also registered for read events (to handle precisely this case actually). ReceivingConnection would anyway be notified since it is waiting on reads on that socket. This, ofcourse, assumes that local node detects remote node failure at tcp layer. Problems come in when ConnectionManager sometimes cannot detect loss of sending connections - Key: SPARK-704 URL: https://issues.apache.org/jira/browse/SPARK-704 Project: Spark Issue Type: Bug Reporter: Charles Reiss Assignee: Henry Saputra ConnectionManager currently does not detect when SendingConnections disconnect except if it is trying to send through them. As a result, a node failure just after a connection is initiated but before any acknowledgement messages can be sent may result in a hang. ConnectionManager has code intended to detect this case by detecting the failure of a corresponding ReceivingConnection, but this code assumes that the remote host:port of the ReceivingConnection is the same as the ConnectionManagerId, which is almost never true. Additionally, there does not appear to be any reason to assume a corresponding ReceivingConnection will exist. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-2223) Building and running tests with maven is extremely slow
[ https://issues.apache.org/jira/browse/SPARK-2223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14039217#comment-14039217 ] Mridul Muralidharan commented on SPARK-2223: [~tgraves] You could try running zinc - speeds up the maven build quite a bit. I find that I need to shutdown and restart it at times though .. but otherwise, works fine. Building and running tests with maven is extremely slow --- Key: SPARK-2223 URL: https://issues.apache.org/jira/browse/SPARK-2223 Project: Spark Issue Type: Bug Components: Build Affects Versions: 1.0.0 Reporter: Thomas Graves For some reason using maven with Spark is extremely slow. Building and running tests takes way longer then other projects I have used that use maven. We should investigate to see why. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-2089) With YARN, preferredNodeLocalityData isn't honored
[ https://issues.apache.org/jira/browse/SPARK-2089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14039236#comment-14039236 ] Mridul Muralidharan commented on SPARK-2089: [~pwendell] SplitInfo is not from hadoop - but gives locality preference in context spark (see org.apache.spark.scheduler.SplitInfo) in a reasonably api agnostic way. The default support provided for it is hadoop specific based on dfs blocks - but I dont think there is anything stopping us from expressing other forms (either already currently or with minor modifications as applicable). We actually very heavily use that api - moving 10s or 100s of TB of data tends to be fairly expensive :-) Since we are still stuck in 0.9 + changes, have not yet faced this issue though, so great to see this being addressed. With YARN, preferredNodeLocalityData isn't honored --- Key: SPARK-2089 URL: https://issues.apache.org/jira/browse/SPARK-2089 Project: Spark Issue Type: Bug Components: YARN Affects Versions: 1.0.0 Reporter: Sandy Ryza Assignee: Sandy Ryza Priority: Critical When running in YARN cluster mode, apps can pass preferred locality data when constructing a Spark context that will dictate where to request executor containers. This is currently broken because of a race condition. The Spark-YARN code runs the user class and waits for it to start up a SparkContext. During its initialization, the SparkContext will create a YarnClusterScheduler, which notifies a monitor in the Spark-YARN code that . The Spark-Yarn code then immediately fetches the preferredNodeLocationData from the SparkContext and uses it to start requesting containers. But in the SparkContext constructor that takes the preferredNodeLocationData, setting preferredNodeLocationData comes after the rest of the initialization, so, if the Spark-YARN code comes around quickly enough after being notified, the data that's fetched is the empty unset version. The occurred during all of my runs. -- This message was sent by Atlassian JIRA (v6.2#6252)
Re: Java IO Stream Corrupted - Invalid Type AC?
On Wed, Jun 18, 2014 at 6:19 PM, Surendranauth Hiraman suren.hira...@velos.io wrote: Patrick, My team is using shuffle consolidation but not speculation. We are also using persist(DISK_ONLY) for caching. Use of shuffle consolidation is probably what is causing the issue. Would be good idea to try again with that turned off (which is the default). It should get fixed most likely in 1.1 timeframe. Regards, Mridul Here are some config changes that are in our work-in-progress. We've been trying for 2 weeks to get our production flow (maybe around 50-70 stages, a few forks and joins with up to 20 branches in the forks) to run end to end without any success, running into other problems besides this one as well. For example, we have run into situations where saving to HDFS just hangs on a couple of tasks, which are printing out nothing in their logs and not taking any CPU. For testing, our input data is 10 GB across 320 input splits and generates maybe around 200-300 GB of intermediate and final data. conf.set(spark.executor.memory, 14g) // TODO make this configurable // shuffle configs conf.set(spark.default.parallelism, 320) // TODO make this configurable conf.set(spark.shuffle.consolidateFiles,true) conf.set(spark.shuffle.file.buffer.kb, 200) conf.set(spark.reducer.maxMbInFlight, 96) conf.set(spark.rdd.compress,true // we ran into a problem with the default timeout of 60 seconds // this is also being set in the master's spark-env.sh. Not sure if it needs to be in both places conf.set(spark.worker.timeout,180) // akka settings conf.set(spark.akka.threads, 300) conf.set(spark.akka.timeout, 180) conf.set(spark.akka.frameSize, 100) conf.set(spark.akka.batchSize, 30) conf.set(spark.akka.askTimeout, 30) // block manager conf.set(spark.storage.blockManagerTimeoutIntervalMs, 18) conf.set(spark.blockManagerHeartBeatMs, 8) -Suren On Wed, Jun 18, 2014 at 1:42 AM, Patrick Wendell pwend...@gmail.com wrote: Out of curiosity - are you guys using speculation, shuffle consolidation, or any other non-default option? If so that would help narrow down what's causing this corruption. On Tue, Jun 17, 2014 at 10:40 AM, Surendranauth Hiraman suren.hira...@velos.io wrote: Matt/Ryan, Did you make any headway on this? My team is running into this also. Doesn't happen on smaller datasets. Our input set is about 10 GB but we generate 100s of GBs in the flow itself. -Suren On Fri, Jun 6, 2014 at 5:19 PM, Ryan Compton compton.r...@gmail.com wrote: Just ran into this today myself. I'm on branch-1.0 using a CDH3 cluster (no modifications to Spark or its dependencies). The error appeared trying to run GraphX's .connectedComponents() on a ~200GB edge list (GraphX worked beautifully on smaller data). Here's the stacktrace (it's quite similar to yours https://imgur.com/7iBA4nJ ). 14/06/05 20:02:28 ERROR scheduler.TaskSetManager: Task 5.599:39 failed 4 times; aborting job 14/06/05 20:02:28 INFO scheduler.DAGScheduler: Failed to run reduce at VertexRDD.scala:100 Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 5.599:39 failed 4 times, most recent failure: Exception failure in TID 29735 on host node18: java.io.StreamCorruptedException: invalid type code: AC java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1355) java.io.ObjectInputStream.readObject(ObjectInputStream.java:350) org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63) org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:125) org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30) org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) scala.collection.Iterator$class.foreach(Iterator.scala:727) scala.collection.AbstractIterator.foreach(Iterator.scala:1157) org.apache.spark.graphx.impl.VertexPartitionBaseOps.innerJoinKeepLeft(VertexPartitionBaseOps.scala:192) org.apache.spark.graphx.impl.EdgePartition.updateVertices(EdgePartition.scala:78) org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:75) org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:73) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
[jira] [Commented] (SPARK-1353) IllegalArgumentException when writing to disk
[ https://issues.apache.org/jira/browse/SPARK-1353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14033625#comment-14033625 ] Mridul Muralidharan commented on SPARK-1353: This is due to limitation in spark which is being addressed in https://issues.apache.org/jira/browse/SPARK-1476. IllegalArgumentException when writing to disk - Key: SPARK-1353 URL: https://issues.apache.org/jira/browse/SPARK-1353 Project: Spark Issue Type: Bug Components: Block Manager Environment: AWS EMR 3.2.30-49.59.amzn1.x86_64 #1 SMP x86_64 GNU/Linux Spark 1.0.0-SNAPSHOT built for Hadoop 1.0.4 built 2014-03-18 Reporter: Jim Blomo Priority: Minor The Executor may fail when trying to mmap a file bigger than Integer.MAX_VALUE due to the constraints of FileChannel.map (http://docs.oracle.com/javase/7/docs/api/java/nio/channels/FileChannel.html#map(java.nio.channels.FileChannel.MapMode, long, long)). The signature takes longs, but the size value must be less than MAX_VALUE. This manifests with the following backtrace: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:98) at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:337) at org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:281) at org.apache.spark.storage.BlockManager.get(BlockManager.scala:430) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:38) at org.apache.spark.rdd.RDD.iterator(RDD.scala:220) at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85) -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-2018) Big-Endian (IBM Power7) Spark Serialization issue
[ https://issues.apache.org/jira/browse/SPARK-2018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14027808#comment-14027808 ] Mridul Muralidharan commented on SPARK-2018: Ah ! This is an interesting bug. Default spark uses java serialization ... so should not be an issue : but yet you are facing it ! (I am assuming you have not customized serialization). Is it possible for you to dump data written and read at both ends ? The env vars and jvm details ? Actually, spark does not do anything fancy for default serialization : so a simple example code without spark in picture could also be tried (write to file on master node, and read from the file in slave node - and see if it works) Big-Endian (IBM Power7) Spark Serialization issue -- Key: SPARK-2018 URL: https://issues.apache.org/jira/browse/SPARK-2018 Project: Spark Issue Type: Bug Affects Versions: 1.0.0 Environment: hardware : IBM Power7 OS:Linux version 2.6.32-358.el6.ppc64 (mockbu...@ppc-017.build.eng.bos.redhat.com) (gcc version 4.4.7 20120313 (Red Hat 4.4.7-3) (GCC) ) #1 SMP Tue Jan 29 11:43:27 EST 2013 JDK: Java(TM) SE Runtime Environment (build pxp6470sr5-20130619_01(SR5)) IBM J9 VM (build 2.6, JRE 1.7.0 Linux ppc64-64 Compressed References 20130617_152572 (JIT enabled, AOT enabled) Hadoop:Hadoop-0.2.3-CDH5.0 Spark:Spark-1.0.0 or Spark-0.9.1 spark-env.sh: export JAVA_HOME=/opt/ibm/java-ppc64-70/ export SPARK_MASTER_IP=9.114.34.69 export SPARK_WORKER_MEMORY=1m export SPARK_CLASSPATH=/home/test1/spark-1.0.0-bin-hadoop2/lib export STANDALONE_SPARK_MASTER_HOST=9.114.34.69 #export SPARK_JAVA_OPTS=' -Xdebug -Xrunjdwp:transport=dt_socket,address=9,server=y,suspend=n ' Reporter: Yanjie Gao We have an application run on Spark on Power7 System . But we meet an important issue about serialization. The example HdfsWordCount can meet the problem. ./bin/run-example org.apache.spark.examples.streaming.HdfsWordCount localdir We used Power7 (Big-Endian arch) and Redhat 6.4. Big-Endian is the main cause since the example ran successfully in another Power-based Little Endian setup. here is the exception stack and log: Spark Executor Command: /opt/ibm/java-ppc64-70//bin/java -cp /home/test1/spark-1.0.0-bin-hadoop2/lib::/home/test1/src/spark-1.0.0-bin-hadoop2/conf:/home/test1/src/spark-1.0.0-bin-hadoop2/lib/spark-assembly-1.0.0-hadoop2.2.0.jar:/home/test1/src/spark-1.0.0-bin-hadoop2/lib/datanucleus-rdbms-3.2.1.jar:/home/test1/src/spark-1.0.0-bin-hadoop2/lib/datanucleus-api-jdo-3.2.1.jar:/home/test1/src/spark-1.0.0-bin-hadoop2/lib/datanucleus-core-3.2.2.jar:/home/test1/src/hadoop-2.3.0-cdh5.0.0/etc/hadoop/:/home/test1/src/hadoop-2.3.0-cdh5.0.0/etc/hadoop/ -XX:MaxPermSize=128m -Xdebug -Xrunjdwp:transport=dt_socket,address=9,server=y,suspend=n -Xms512M -Xmx512M org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://spark@9.186.105.141:60253/user/CoarseGrainedScheduler 2 p7hvs7br16 4 akka.tcp://sparkWorker@p7hvs7br16:59240/user/Worker app-20140604023054- 14/06/04 02:31:20 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/06/04 02:31:21 INFO spark.SecurityManager: Changing view acls to: test1,yifeng 14/06/04 02:31:21 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(test1, yifeng) 14/06/04 02:31:22 INFO slf4j.Slf4jLogger: Slf4jLogger started 14/06/04 02:31:22 INFO Remoting: Starting remoting 14/06/04 02:31:22 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkExecutor@p7hvs7br16:39658] 14/06/04 02:31:22 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkExecutor@p7hvs7br16:39658] 14/06/04 02:31:22 INFO executor.CoarseGrainedExecutorBackend: Connecting to driver: akka.tcp://spark@9.186.105.141:60253/user/CoarseGrainedScheduler 14/06/04 02:31:22 INFO worker.WorkerWatcher: Connecting to worker akka.tcp://sparkWorker@p7hvs7br16:59240/user/Worker 14/06/04 02:31:23 INFO worker.WorkerWatcher: Successfully connected to akka.tcp://sparkWorker@p7hvs7br16:59240/user/Worker 14/06/04 02:31:24 INFO executor.CoarseGrainedExecutorBackend: Successfully registered with driver 14/06/04 02:31:24 INFO spark.SecurityManager: Changing view acls to: test1,yifeng 14/06/04 02:31:24 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(test1, yifeng) 14/06/04 02:31:24 INFO slf4j.Slf4jLogger: Slf4jLogger started 14/06/04 02:31:24 INFO Remoting: Starting remoting 14/06/04 02:31:24 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://spark
[jira] [Commented] (SPARK-2089) With YARN, preferredNodeLocalityData isn't honored
[ https://issues.apache.org/jira/browse/SPARK-2089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14026397#comment-14026397 ] Mridul Muralidharan commented on SPARK-2089: preferredNodeLocationData used to be passed as a constructor parameter - and so always available. The rearrangement of SparkContext initialization has introduced this bug. With YARN, preferredNodeLocalityData isn't honored --- Key: SPARK-2089 URL: https://issues.apache.org/jira/browse/SPARK-2089 Project: Spark Issue Type: Bug Components: YARN Affects Versions: 1.0.0 Reporter: Sandy Ryza When running in YARN cluster mode, apps can pass preferred locality data when constructing a Spark context that will dictate where to request executor containers. This is currently broken because of a race condition. The Spark-YARN code runs the user class and waits for it to start up a SparkContext. During its initialization, the SparkContext will create a YarnClusterScheduler, which notifies a monitor in the Spark-YARN code that . The Spark-Yarn code then immediately fetches the preferredNodeLocationData from the SparkContext and uses it to start requesting containers. But in the SparkContext constructor that takes the preferredNodeLocationData, setting preferredNodeLocationData comes after the rest of the initialization, so, if the Spark-YARN code comes around quickly enough after being notified, the data that's fetched is the empty unset version. The occurred during all of my runs. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-2064) web ui should not remove executors if they are dead
[ https://issues.apache.org/jira/browse/SPARK-2064?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14020789#comment-14020789 ] Mridul Muralidharan commented on SPARK-2064: Depending on how long a job runs, this can cause OOM on the master. In yarn (and mesos ?) an executor on the same node gets different port if relaunched on failure - and so end up as different executor in the list. web ui should not remove executors if they are dead --- Key: SPARK-2064 URL: https://issues.apache.org/jira/browse/SPARK-2064 Project: Spark Issue Type: Sub-task Reporter: Reynold Xin We should always show the list of executors that have ever been connected, and add a status column to mark them as dead if they have been disconnected. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-2064) web ui should not remove executors if they are dead
[ https://issues.apache.org/jira/browse/SPARK-2064?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14020936#comment-14020936 ] Mridul Muralidharan commented on SPARK-2064: It is 100 MB (or more) of memory which could be used elsewhere. In our clusters, for example, the number of workers can be very high while the containers can be quite ephemeral when under load (and so lot of container losses); on other hand, memory per container is constrained to about 8 gig (lower when we account for overheads, etc). So the amount of working memory in master reduces : we are finding that UI and related codepath is one of the portions which seems to be occupying a lot of memory in the OOM dumps of master. web ui should not remove executors if they are dead --- Key: SPARK-2064 URL: https://issues.apache.org/jira/browse/SPARK-2064 Project: Spark Issue Type: Sub-task Reporter: Reynold Xin We should always show the list of executors that have ever been connected, and add a status column to mark them as dead if they have been disconnected. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-2064) web ui should not remove executors if they are dead
[ https://issues.apache.org/jira/browse/SPARK-2064?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14021008#comment-14021008 ] Mridul Muralidharan commented on SPARK-2064: Unfortunately OOM is a very big issue for us since application master is single point of failure when running in yarn. Particularly when memory is constrained and vigorously enforced by the yarn containers (requiring higher overheads to be specified reducing usable memory even further. Given this, and given the fair churn already for executor containers, I am hesitant about features which add to the memory footprint for UI even further. The cumulative impact of ui is nontrivial as I mentioned before. This, for example, would require 1-8% of master memory when there is reasonable churn for long running jobs (30 hours) on reasonable number of executors (200-300). web ui should not remove executors if they are dead --- Key: SPARK-2064 URL: https://issues.apache.org/jira/browse/SPARK-2064 Project: Spark Issue Type: Sub-task Reporter: Reynold Xin We should always show the list of executors that have ever been connected, and add a status column to mark them as dead if they have been disconnected. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-2064) web ui should not remove executors if they are dead
[ https://issues.apache.org/jira/browse/SPARK-2064?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14021011#comment-14021011 ] Mridul Muralidharan commented on SPARK-2064: I am probably missing the intent behind this change. What is the expected use case it is supposed to help with ? web ui should not remove executors if they are dead --- Key: SPARK-2064 URL: https://issues.apache.org/jira/browse/SPARK-2064 Project: Spark Issue Type: Sub-task Reporter: Reynold Xin We should always show the list of executors that have ever been connected, and add a status column to mark them as dead if they have been disconnected. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-2017) web ui stage page becomes unresponsive when the number of tasks is large
[ https://issues.apache.org/jira/browse/SPARK-2017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14019394#comment-14019394 ] Mridul Muralidharan commented on SPARK-2017: Currently, for our jobs, I run with spark.ui.retainedStages=3 (so that there is some visibility into past stages) : this is to prevent OOM's in the master when number of tasks per stage is not low (50k for example is not very high imo) The stage details UI becomes very sluggish to pretty much unresponsive for our tasks where tasks 30k ... though that might also be a browser issue (firefox/chrome) ? web ui stage page becomes unresponsive when the number of tasks is large Key: SPARK-2017 URL: https://issues.apache.org/jira/browse/SPARK-2017 Project: Spark Issue Type: Sub-task Reporter: Reynold Xin Labels: starter {code} sc.parallelize(1 to 100, 100).count() {code} The above code creates one million tasks to be executed. The stage detail web ui page takes forever to load (if it ever completes). There are again a few different alternatives: 0. Limit the number of tasks we show. 1. Pagination 2. By default only show the aggregate metrics and failed tasks, and hide the successful ones. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-1956) Enable shuffle consolidation by default
[ https://issues.apache.org/jira/browse/SPARK-1956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14011741#comment-14011741 ] Mridul Muralidharan commented on SPARK-1956: shuffle consolidation MUST NOT be enabled - whether by default, or intentionally. In 1.0, it is very badly broken - we have a whole litany of fixes for it, before it was reasonably stable. Current plan is to contribute most of these back in 1.1 timeframe. Enable shuffle consolidation by default --- Key: SPARK-1956 URL: https://issues.apache.org/jira/browse/SPARK-1956 Project: Spark Issue Type: Improvement Components: Shuffle, Spark Core Affects Versions: 1.0.0 Reporter: Sandy Ryza The only drawbacks are on ext3, and most everyone has ext4 at this point. I think it's better to aim the default at the common case. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-1855) Provide memory-and-local-disk RDD checkpointing
[ https://issues.apache.org/jira/browse/SPARK-1855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14001377#comment-14001377 ] Mridul Muralidharan commented on SPARK-1855: Did not realize that mail replies to JIRA mails did not get mirrored to JIRA ! Replicating my mail here : – cut and paste – We don't have 3x replication in spark :-) And if we use replicated storagelevel, while decreasing odds of failure, it does not eliminate it (since we are not doing a great job with replication anyway from fault tolerance point of view). Also it does take a nontrivial performance hit with replicated levels. Regards, Mridul Provide memory-and-local-disk RDD checkpointing --- Key: SPARK-1855 URL: https://issues.apache.org/jira/browse/SPARK-1855 Project: Spark Issue Type: New Feature Components: MLlib, Spark Core Affects Versions: 1.0.0 Reporter: Xiangrui Meng Checkpointing is used to cut long lineage while maintaining fault tolerance. The current implementation is HDFS-based. Using the BlockRDD we can create in-memory-and-local-disk (with replication) checkpoints that are not as reliable as HDFS-based solution but faster. It can help applications that require many iterations. -- This message was sent by Atlassian JIRA (v6.2#6252)
Re: [VOTE] Release Apache Spark 1.0.0 (rc5)
On 18-May-2014 5:05 am, Mark Hamstra m...@clearstorydata.com wrote: I don't understand. We never said that interfaces wouldn't change from 0.9 Agreed. to 1.0. What we are committing to is stability going forward from the 1.0.0 baseline. Nobody is disputing that backward-incompatible behavior or interface changes would be an issue post-1.0.0. The question is whether The point is, how confident are we that these are the right set of interface definitions. We think it is, but we could also have gone through a 0.10 to vet the proposed 1.0 changes to stabilize them. To give examples for which we don't have solutions currently (which we are facing internally here btw, so not academic exercise) : - Current spark shuffle model breaks very badly as number of partitions increases (input and output). - As number of nodes increase, the overhead per node keeps going up. Spark currently is more geared towards large memory machines; when the RAM per node is modest (8 to 16 gig) but large number of them are available, it does not do too well. - Current block abstraction breaks as data per block goes beyond 2 gig. - Cogroup/join when value per key or number of keys (or both) is high breaks currently. - Shuffle consolidation is so badly broken it is not funny. - Currently there is no way of effectively leveraging accelerator cards/coprocessors/gpus from spark - to do so, I suspect we will need to redefine OFF_HEAP. - Effectively leveraging ssd is still an open question IMO when you have mix of both available. We have resolved some of these and looking at the rest. These are not unique to our internal usage profile, I have seen most of these asked elsewhere too. Thankfully some of the 1.0 changes actually are geared towards helping to alleviate some of the above (Iterable change for ex), most of the rest are internal impl detail of spark core which helps a lot - but there are cases where this is not so. Unfortunately I don't know yet if the unresolved/uninvestigated issues will require more changes or not. Given this I am very skeptical of expecting current spark interfaces to be sufficient for next 1 year (forget 3) I understand this is an argument which can be made to never release 1.0 :-) Which is why I was ok with a 1.0 instead of 0.10 release in spite of my preference. This is a good problem to have IMO ... People are using spark extensively and in circumstances that we did not envision : necessitating changes even to spark core. But the claim that 1.0 interfaces are stable is not something I buy - they are not, we will need to break them soon and cost of maintaining backward compatibility will be high. We just need to make an informed decision to live with that cost, not hand wave it away. Regards Mridul there is anything apparent now that is expected to require such disruptive changes if we were to commit to the current release candidate as our guaranteed 1.0.0 baseline. On Sat, May 17, 2014 at 2:05 PM, Mridul Muralidharan mri...@gmail.com wrote: I would make the case for interface stability not just api stability. Particularly given that we have significantly changed some of our interfaces, I want to ensure developers/users are not seeing red flags. Bugs and code stability can be addressed in minor releases if found, but behavioral change and/or interface changes would be a much more invasive issue for our users. Regards Mridul On 18-May-2014 2:19 am, Matei Zaharia matei.zaha...@gmail.com wrote: As others have said, the 1.0 milestone is about API stability, not about saying “we’ve eliminated all bugs”. The sooner you declare 1.0, the sooner users can confidently build on Spark, knowing that the application they build today will still run on Spark 1.9.9 three years from now. This is something that I’ve seen done badly (and experienced the effects thereof) in other big data projects, such as MapReduce and even YARN. The result is that you annoy users, you end up with a fragmented userbase where everyone is building against a different version, and you drastically slow down development. With a project as fast-growing as fast-growing as Spark in particular, there will be new bugs discovered and reported continuously, especially in the non-core components. Look at the graph of # of contributors in time to Spark: https://www.ohloh.net/p/apache-spark (bottom-most graph; “commits” changed when we started merging each patch as a single commit). This is not slowing down, and we need to have the culture now that we treat API stability and release numbers at the level expected for a 1.0 project instead of having people come in and randomly change the API. I’ll also note that the issues marked “blocker” were marked so by their reporters, since the reporter can set the priority. I don’t consider stuff like parallelize() not partitioning ranges in the same way as other collections
Re: [VOTE] Release Apache Spark 1.0.0 (rc5)
So I think I need to clarify a few things here - particularly since this mail went to the wrong mailing list and a much wider audience than I intended it for :-) Most of the issues I mentioned are internal implementation detail of spark core : which means, we can enhance them in future without disruption to our userbase (ability to support large number of input/output partitions. Note: this is of order of 100k input and output partitions with uniform spread of keys - very rarely seen outside of some crazy jobs). Some of the issues I mentioned would reqiure DeveloperApi changes - which are not user exposed : they would impact developer use of these api's - which are mostly internally provided by spark. (Like fixing blocks 2G would require change to Serializer api) A smaller faction might require interface changes - note, I am referring specifically to configuration changes (removing/deprecating some) and possibly newer options to submit/env, etc - I dont envision any programming api change itself. The only api change we did was from Seq - Iterable - which is actually to address some of the issues I mentioned (join/cogroup). Remaining are bugs which need to be addressed or the feature removed/enhanced like shuffle consolidation. There might be semantic extension of some things like OFF_HEAP storage level to address other computation models - but that would not have an impact on end user - since other options would be pluggable with default set to Tachyon so that there is no user expectation change. So will the interface possibly change ? Sure though we will try to keep it backwardly compatible (as we did with 1.0). Will the api change - other than backward compatible enhancements, probably not. Regards, Mridul On Sun, May 18, 2014 at 12:11 PM, Mridul Muralidharan mri...@gmail.com wrote: On 18-May-2014 5:05 am, Mark Hamstra m...@clearstorydata.com wrote: I don't understand. We never said that interfaces wouldn't change from 0.9 Agreed. to 1.0. What we are committing to is stability going forward from the 1.0.0 baseline. Nobody is disputing that backward-incompatible behavior or interface changes would be an issue post-1.0.0. The question is whether The point is, how confident are we that these are the right set of interface definitions. We think it is, but we could also have gone through a 0.10 to vet the proposed 1.0 changes to stabilize them. To give examples for which we don't have solutions currently (which we are facing internally here btw, so not academic exercise) : - Current spark shuffle model breaks very badly as number of partitions increases (input and output). - As number of nodes increase, the overhead per node keeps going up. Spark currently is more geared towards large memory machines; when the RAM per node is modest (8 to 16 gig) but large number of them are available, it does not do too well. - Current block abstraction breaks as data per block goes beyond 2 gig. - Cogroup/join when value per key or number of keys (or both) is high breaks currently. - Shuffle consolidation is so badly broken it is not funny. - Currently there is no way of effectively leveraging accelerator cards/coprocessors/gpus from spark - to do so, I suspect we will need to redefine OFF_HEAP. - Effectively leveraging ssd is still an open question IMO when you have mix of both available. We have resolved some of these and looking at the rest. These are not unique to our internal usage profile, I have seen most of these asked elsewhere too. Thankfully some of the 1.0 changes actually are geared towards helping to alleviate some of the above (Iterable change for ex), most of the rest are internal impl detail of spark core which helps a lot - but there are cases where this is not so. Unfortunately I don't know yet if the unresolved/uninvestigated issues will require more changes or not. Given this I am very skeptical of expecting current spark interfaces to be sufficient for next 1 year (forget 3) I understand this is an argument which can be made to never release 1.0 :-) Which is why I was ok with a 1.0 instead of 0.10 release in spite of my preference. This is a good problem to have IMO ... People are using spark extensively and in circumstances that we did not envision : necessitating changes even to spark core. But the claim that 1.0 interfaces are stable is not something I buy - they are not, we will need to break them soon and cost of maintaining backward compatibility will be high. We just need to make an informed decision to live with that cost, not hand wave it away. Regards Mridul there is anything apparent now that is expected to require such disruptive changes if we were to commit to the current release candidate as our guaranteed 1.0.0 baseline. On Sat, May 17, 2014 at 2:05 PM, Mridul Muralidharan mri...@gmail.comwrote: I would make the case for interface stability not just api stability. Particularly given
Re: [jira] [Created] (SPARK-1855) Provide memory-and-local-disk RDD checkpointing
My bad ... I was replying via mobile, and I did not realize responses to JIRA mails were not mirrored to JIRA - unlike PR responses ! Regards, Mridul On Sun, May 18, 2014 at 2:50 AM, Matei Zaharia matei.zaha...@gmail.com wrote: We do actually have replicated StorageLevels in Spark. You can use MEMORY_AND_DISK_2 or construct your own StorageLevel with your own custom replication factor. BTW you guys should probably have this discussion on the JIRA rather than the dev list; I think the replies somehow ended up on the dev list. Matei On May 17, 2014, at 1:36 AM, Mridul Muralidharan mri...@gmail.com wrote: We don't have 3x replication in spark :-) And if we use replicated storagelevel, while decreasing odds of failure, it does not eliminate it (since we are not doing a great job with replication anyway from fault tolerance point of view). Also it does take a nontrivial performance hit with replicated levels. Regards, Mridul On 17-May-2014 8:16 am, Xiangrui Meng men...@gmail.com wrote: With 3x replication, we should be able to achieve fault tolerance. This checkPointed RDD can be cleared if we have another in-memory checkPointed RDD down the line. It can avoid hitting disk if we have enough memory to use. We need to investigate more to find a good solution. -Xiangrui On Fri, May 16, 2014 at 4:00 PM, Mridul Muralidharan mri...@gmail.com wrote: Effectively this is persist without fault tolerance. Failure of any node means complete lack of fault tolerance. I would be very skeptical of truncating lineage if it is not reliable. On 17-May-2014 3:49 am, Xiangrui Meng (JIRA) j...@apache.org wrote: Xiangrui Meng created SPARK-1855: Summary: Provide memory-and-local-disk RDD checkpointing Key: SPARK-1855 URL: https://issues.apache.org/jira/browse/SPARK-1855 Project: Spark Issue Type: New Feature Components: MLlib, Spark Core Affects Versions: 1.0.0 Reporter: Xiangrui Meng Checkpointing is used to cut long lineage while maintaining fault tolerance. The current implementation is HDFS-based. Using the BlockRDD we can create in-memory-and-local-disk (with replication) checkpoints that are not as reliable as HDFS-based solution but faster. It can help applications that require many iterations. -- This message was sent by Atlassian JIRA (v6.2#6252)
Re: [VOTE] Release Apache Spark 1.0.0 (rc5)
I had echoed similar sentiments a while back when there was a discussion around 0.10 vs 1.0 ... I would have preferred 0.10 to stabilize the api changes, add missing functionality, go through a hardening release before 1.0 But the community preferred a 1.0 :-) Regards, Mridul On 17-May-2014 3:19 pm, Sean Owen so...@cloudera.com wrote: On this note, non-binding commentary: Releases happen in local minima of change, usually created by internally enforced code freeze. Spark is incredibly busy now due to external factors -- recently a TLP, recently discovered by a large new audience, ease of contribution enabled by Github. It's getting like the first year of mainstream battle-testing in a month. It's been very hard to freeze anything! I see a number of non-trivial issues being reported, and I don't think it has been possible to triage all of them, even. Given the high rate of change, my instinct would have been to release 0.10.0 now. But won't it always be very busy? I do think the rate of significant issues will slow down. Version ain't nothing but a number, but if it has any meaning it's the semantic versioning meaning. 1.0 imposes extra handicaps around striving to maintain backwards-compatibility. That may end up being bent to fit in important changes that are going to be required in this continuing period of change. Hadoop does this all the time unfortunately and gets away with it, I suppose -- minor version releases are really major. (On the other extreme, HBase is at 0.98 and quite production-ready.) Just consider this a second vote for focus on fixes and 1.0.x rather than new features and 1.x. I think there are a few steps that could streamline triage of this flood of contributions, and make all of this easier, but that's for another thread. On Fri, May 16, 2014 at 8:50 PM, Mark Hamstra m...@clearstorydata.com wrote: +1, but just barely. We've got quite a number of outstanding bugs identified, and many of them have fixes in progress. I'd hate to see those efforts get lost in a post-1.0.0 flood of new features targeted at 1.1.0 -- in other words, I'd like to see 1.0.1 retain a high priority relative to 1.1.0. Looking through the unresolved JIRAs, it doesn't look like any of the identified bugs are show-stoppers or strictly regressions (although I will note that one that I have in progress, SPARK-1749, is a bug that we introduced with recent work -- it's not strictly a regression because we had equally bad but different behavior when the DAGScheduler exceptions weren't previously being handled at all vs. being slightly mis-handled now), so I'm not currently seeing a reason not to release.
Re: [jira] [Created] (SPARK-1867) Spark Documentation Error causes java.lang.IllegalStateException: unread block data
I suspect this is an issue we have fixed internally here as part of a larger change - the issue we fixed was not a config issue but bugs in spark. Unfortunately we plan to contribute this as part of 1.1 Regards, Mridul On 17-May-2014 4:09 pm, sam (JIRA) j...@apache.org wrote: sam created SPARK-1867: -- Summary: Spark Documentation Error causes java.lang.IllegalStateException: unread block data Key: SPARK-1867 URL: https://issues.apache.org/jira/browse/SPARK-1867 Project: Spark Issue Type: Bug Reporter: sam I've employed two System Administrators on a contract basis (for quite a bit of money), and both contractors have independently hit the following exception. What we are doing is: 1. Installing Spark 0.9.1 according to the documentation on the website, along with CDH4 (and another cluster with CDH5) distros of hadoop/hdfs. 2. Building a fat jar with a Spark app with sbt then trying to run it on the cluster I've also included code snippets, and sbt deps at the bottom. When I've Googled this, there seems to be two somewhat vague responses: a) Mismatching spark versions on nodes/user code b) Need to add more jars to the SparkConf Now I know that (b) is not the problem having successfully run the same code on other clusters while only including one jar (it's a fat jar). But I have no idea how to check for (a) - it appears Spark doesn't have any version checks or anything - it would be nice if it checked versions and threw a mismatching version exception: you have user code using version X and node Y has version Z. I would be very grateful for advice on this. The exception: Exception in thread main org.apache.spark.SparkException: Job aborted: Task 0.0:1 failed 32 times (most recent failure: Exception failure: java.lang.IllegalStateException: unread block data) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604) at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 14/05/16 18:05:31 INFO scheduler.TaskSetManager: Loss was due to java.lang.IllegalStateException: unread block data [duplicate 59] My code snippet: val conf = new SparkConf() .setMaster(clusterMaster) .setAppName(appName) .setSparkHome(sparkHome) .setJars(SparkContext.jarOfClass(this.getClass)) println(count = + new SparkContext(conf).textFile(someHdfsPath).count()) My SBT dependencies: // relevant org.apache.spark % spark-core_2.10 % 0.9.1, org.apache.hadoop % hadoop-client % 2.3.0-mr1-cdh5.0.0, // standard, probably unrelated com.github.seratch %% awscala % [0.2,), org.scalacheck %% scalacheck % 1.10.1 % test, org.specs2 %% specs2 % 1.14 % test, org.scala-lang % scala-reflect % 2.10.3, org.scalaz %% scalaz-core % 7.0.5, net.minidev % json-smart % 1.2 -- This message was sent by Atlassian JIRA (v6.2#6252)
Re: [VOTE] Release Apache Spark 1.0.0 (rc5)
We made incompatible api changes whose impact we don't know yet completely : both from implementation and usage point of view. We had the option of getting real-world feedback from the user community if we had gone to 0.10 but the spark developers seemed to be in a hurry to get to 1.0 - so I made my opinion known but left it to the wisdom of larger group of committers to decide ... I did not think it was critical enough to do a binding -1 on. Regards Mridul On 17-May-2014 9:43 pm, Mark Hamstra m...@clearstorydata.com wrote: Which of the unresolved bugs in spark-core do you think will require an API-breaking change to fix? If there are none of those, then we are still essentially on track for a 1.0.0 release. The number of contributions and pace of change now is quite high, but I don't think that waiting for the pace to slow before releasing 1.0 is viable. If Spark's short history is any guide to its near future, the pace will not slow by any significant amount for any noteworthy length of time, but rather will continue to increase. What we need to be aiming for, I think, is to have the great majority of those new contributions being made to MLLlib, GraphX, SparkSQL and other areas of the code that we have clearly marked as not frozen in 1.x. I think we are already seeing that, but if I am just not recognizing breakage of our semantic versioning guarantee that will be forced on us by some pending changes, now would be a good time to set me straight. On Sat, May 17, 2014 at 4:26 AM, Mridul Muralidharan mri...@gmail.com wrote: I had echoed similar sentiments a while back when there was a discussion around 0.10 vs 1.0 ... I would have preferred 0.10 to stabilize the api changes, add missing functionality, go through a hardening release before 1.0 But the community preferred a 1.0 :-) Regards, Mridul On 17-May-2014 3:19 pm, Sean Owen so...@cloudera.com wrote: On this note, non-binding commentary: Releases happen in local minima of change, usually created by internally enforced code freeze. Spark is incredibly busy now due to external factors -- recently a TLP, recently discovered by a large new audience, ease of contribution enabled by Github. It's getting like the first year of mainstream battle-testing in a month. It's been very hard to freeze anything! I see a number of non-trivial issues being reported, and I don't think it has been possible to triage all of them, even. Given the high rate of change, my instinct would have been to release 0.10.0 now. But won't it always be very busy? I do think the rate of significant issues will slow down. Version ain't nothing but a number, but if it has any meaning it's the semantic versioning meaning. 1.0 imposes extra handicaps around striving to maintain backwards-compatibility. That may end up being bent to fit in important changes that are going to be required in this continuing period of change. Hadoop does this all the time unfortunately and gets away with it, I suppose -- minor version releases are really major. (On the other extreme, HBase is at 0.98 and quite production-ready.) Just consider this a second vote for focus on fixes and 1.0.x rather than new features and 1.x. I think there are a few steps that could streamline triage of this flood of contributions, and make all of this easier, but that's for another thread. On Fri, May 16, 2014 at 8:50 PM, Mark Hamstra m...@clearstorydata.com wrote: +1, but just barely. We've got quite a number of outstanding bugs identified, and many of them have fixes in progress. I'd hate to see those efforts get lost in a post-1.0.0 flood of new features targeted at 1.1.0 -- in other words, I'd like to see 1.0.1 retain a high priority relative to 1.1.0. Looking through the unresolved JIRAs, it doesn't look like any of the identified bugs are show-stoppers or strictly regressions (although I will note that one that I have in progress, SPARK-1749, is a bug that we introduced with recent work -- it's not strictly a regression because we had equally bad but different behavior when the DAGScheduler exceptions weren't previously being handled at all vs. being slightly mis-handled now), so I'm not currently seeing a reason not to release.
Re: [VOTE] Release Apache Spark 1.0.0 (rc5)
On 17-May-2014 11:40 pm, Mark Hamstra m...@clearstorydata.com wrote: That is a past issue that we don't need to be re-opening now. The present Huh ? If we need to revisit based on changed circumstances, we must - the scope of changes introduced in this release was definitely not anticipated when 1.0 vs 0.10 discussion happened. If folks are worried about stability of core; it is a valid concern IMO. Having said that, I am still ok with going to 1.0; but if a conversation starts about need for 1.0 vs going to 0.10 I want to hear more and possibly allay the concerns and not try to muzzle the discussion. Regards Mridul issue, and what I am asking, is which pending bug fixes does anyone anticipate will require breaking the public API guaranteed in rc9 On Sat, May 17, 2014 at 9:44 AM, Mridul Muralidharan mri...@gmail.com wrote: We made incompatible api changes whose impact we don't know yet completely : both from implementation and usage point of view. We had the option of getting real-world feedback from the user community if we had gone to 0.10 but the spark developers seemed to be in a hurry to get to 1.0 - so I made my opinion known but left it to the wisdom of larger group of committers to decide ... I did not think it was critical enough to do a binding -1 on. Regards Mridul On 17-May-2014 9:43 pm, Mark Hamstra m...@clearstorydata.com wrote: Which of the unresolved bugs in spark-core do you think will require an API-breaking change to fix? If there are none of those, then we are still essentially on track for a 1.0.0 release. The number of contributions and pace of change now is quite high, but I don't think that waiting for the pace to slow before releasing 1.0 is viable. If Spark's short history is any guide to its near future, the pace will not slow by any significant amount for any noteworthy length of time, but rather will continue to increase. What we need to be aiming for, I think, is to have the great majority of those new contributions being made to MLLlib, GraphX, SparkSQL and other areas of the code that we have clearly marked as not frozen in 1.x. I think we are already seeing that, but if I am just not recognizing breakage of our semantic versioning guarantee that will be forced on us by some pending changes, now would be a good time to set me straight. On Sat, May 17, 2014 at 4:26 AM, Mridul Muralidharan mri...@gmail.com wrote: I had echoed similar sentiments a while back when there was a discussion around 0.10 vs 1.0 ... I would have preferred 0.10 to stabilize the api changes, add missing functionality, go through a hardening release before 1.0 But the community preferred a 1.0 :-) Regards, Mridul On 17-May-2014 3:19 pm, Sean Owen so...@cloudera.com wrote: On this note, non-binding commentary: Releases happen in local minima of change, usually created by internally enforced code freeze. Spark is incredibly busy now due to external factors -- recently a TLP, recently discovered by a large new audience, ease of contribution enabled by Github. It's getting like the first year of mainstream battle-testing in a month. It's been very hard to freeze anything! I see a number of non-trivial issues being reported, and I don't think it has been possible to triage all of them, even. Given the high rate of change, my instinct would have been to release 0.10.0 now. But won't it always be very busy? I do think the rate of significant issues will slow down. Version ain't nothing but a number, but if it has any meaning it's the semantic versioning meaning. 1.0 imposes extra handicaps around striving to maintain backwards-compatibility. That may end up being bent to fit in important changes that are going to be required in this continuing period of change. Hadoop does this all the time unfortunately and gets away with it, I suppose -- minor version releases are really major. (On the other extreme, HBase is at 0.98 and quite production-ready.) Just consider this a second vote for focus on fixes and 1.0.x rather than new features and 1.x. I think there are a few steps that could streamline triage of this flood of contributions, and make all of this easier, but that's for another thread. On Fri, May 16, 2014 at 8:50 PM, Mark Hamstra m...@clearstorydata.com wrote: +1, but just barely. We've got quite a number of outstanding bugs identified, and many of them have fixes in progress. I'd hate to see those efforts get lost in a post-1.0.0 flood of new features targeted at 1.1.0 -- in other words, I'd like to see 1.0.1 retain a high priority relative to 1.1.0. Looking through the unresolved JIRAs, it doesn't look like
Re: [VOTE] Release Apache Spark 1.0.0 (rc5)
On 18-May-2014 1:45 am, Mark Hamstra m...@clearstorydata.com wrote: I'm not trying to muzzle the discussion. All I am saying is that we don't need to have the same discussion about 0.10 vs. 1.0 that we already had. Agreed, no point in repeating the same discussion ... I am also trying to understand what the concerns are. Specifically though, the scope of 1.0 (in terms of changes) went up quite a bit - a lot of which are new changes and features; not just the initially envisioned api changes and stability fixes. If this is raising concerns, particularly since lot of users are depending on stability of spark interfaces (api, env, scripts, behavior); I want to understand better what they are - and if they are legitimately serious enough, we will need to revisit decision to go to 1.0 instead of 0.10 ... I hope we don't need to though given how late we are in dev cycle Regards Mridul If you can tell me about specific changes in the current release candidate that occasion new arguments for why a 1.0 release is an unacceptable idea, then I'm listening. On Sat, May 17, 2014 at 11:59 AM, Mridul Muralidharan mri...@gmail.com wrote: On 17-May-2014 11:40 pm, Mark Hamstra m...@clearstorydata.com wrote: That is a past issue that we don't need to be re-opening now. The present Huh ? If we need to revisit based on changed circumstances, we must - the scope of changes introduced in this release was definitely not anticipated when 1.0 vs 0.10 discussion happened. If folks are worried about stability of core; it is a valid concern IMO. Having said that, I am still ok with going to 1.0; but if a conversation starts about need for 1.0 vs going to 0.10 I want to hear more and possibly allay the concerns and not try to muzzle the discussion. Regards Mridul issue, and what I am asking, is which pending bug fixes does anyone anticipate will require breaking the public API guaranteed in rc9 On Sat, May 17, 2014 at 9:44 AM, Mridul Muralidharan mri...@gmail.com wrote: We made incompatible api changes whose impact we don't know yet completely : both from implementation and usage point of view. We had the option of getting real-world feedback from the user community if we had gone to 0.10 but the spark developers seemed to be in a hurry to get to 1.0 - so I made my opinion known but left it to the wisdom of larger group of committers to decide ... I did not think it was critical enough to do a binding -1 on. Regards Mridul On 17-May-2014 9:43 pm, Mark Hamstra m...@clearstorydata.com wrote: Which of the unresolved bugs in spark-core do you think will require an API-breaking change to fix? If there are none of those, then we are still essentially on track for a 1.0.0 release. The number of contributions and pace of change now is quite high, but I don't think that waiting for the pace to slow before releasing 1.0 is viable. If Spark's short history is any guide to its near future, the pace will not slow by any significant amount for any noteworthy length of time, but rather will continue to increase. What we need to be aiming for, I think, is to have the great majority of those new contributions being made to MLLlib, GraphX, SparkSQL and other areas of the code that we have clearly marked as not frozen in 1.x. I think we are already seeing that, but if I am just not recognizing breakage of our semantic versioning guarantee that will be forced on us by some pending changes, now would be a good time to set me straight. On Sat, May 17, 2014 at 4:26 AM, Mridul Muralidharan mri...@gmail.com wrote: I had echoed similar sentiments a while back when there was a discussion around 0.10 vs 1.0 ... I would have preferred 0.10 to stabilize the api changes, add missing functionality, go through a hardening release before 1.0 But the community preferred a 1.0 :-) Regards, Mridul On 17-May-2014 3:19 pm, Sean Owen so...@cloudera.com wrote: On this note, non-binding commentary: Releases happen in local minima of change, usually created by internally enforced code freeze. Spark is incredibly busy now due to external factors -- recently a TLP, recently discovered by a large new audience, ease of contribution enabled by Github. It's getting like the first year of mainstream battle-testing in a month. It's been very hard to freeze anything! I see a number of non-trivial issues being reported, and I don't think it has been possible to triage all of them, even. Given the high rate of change, my instinct would have been to release 0.10.0 now. But won't it always be very busy? I do think the rate
Re: [VOTE] Release Apache Spark 1.0.0 (rc5)
I would make the case for interface stability not just api stability. Particularly given that we have significantly changed some of our interfaces, I want to ensure developers/users are not seeing red flags. Bugs and code stability can be addressed in minor releases if found, but behavioral change and/or interface changes would be a much more invasive issue for our users. Regards Mridul On 18-May-2014 2:19 am, Matei Zaharia matei.zaha...@gmail.com wrote: As others have said, the 1.0 milestone is about API stability, not about saying “we’ve eliminated all bugs”. The sooner you declare 1.0, the sooner users can confidently build on Spark, knowing that the application they build today will still run on Spark 1.9.9 three years from now. This is something that I’ve seen done badly (and experienced the effects thereof) in other big data projects, such as MapReduce and even YARN. The result is that you annoy users, you end up with a fragmented userbase where everyone is building against a different version, and you drastically slow down development. With a project as fast-growing as fast-growing as Spark in particular, there will be new bugs discovered and reported continuously, especially in the non-core components. Look at the graph of # of contributors in time to Spark: https://www.ohloh.net/p/apache-spark (bottom-most graph; “commits” changed when we started merging each patch as a single commit). This is not slowing down, and we need to have the culture now that we treat API stability and release numbers at the level expected for a 1.0 project instead of having people come in and randomly change the API. I’ll also note that the issues marked “blocker” were marked so by their reporters, since the reporter can set the priority. I don’t consider stuff like parallelize() not partitioning ranges in the same way as other collections a blocker — it’s a bug, it would be good to fix it, but it only affects a small number of use cases. Of course if we find a real blocker (in particular a regression from a previous version, or a feature that’s just completely broken), we will delay the release for that, but at some point you have to say “okay, this fix will go into the next maintenance release”. Maybe we need to write a clear policy for what the issue priorities mean. Finally, I believe it’s much better to have a culture where you can make releases on a regular schedule, and have the option to make a maintenance release in 3-4 days if you find new bugs, than one where you pile up stuff into each release. This is what much large project than us, like Linux, do, and it’s the only way to avoid indefinite stalling with a large contributor base. In the worst case, if you find a new bug that warrants immediate release, it goes into 1.0.1 a week after 1.0.0 (we can vote on 1.0.1 in three days with just your bug fix in it). And if you find an API that you’d like to improve, just add a new one and maybe deprecate the old one — at some point we have to respect our users and let them know that code they write today will still run tomorrow. Matei On May 17, 2014, at 10:32 AM, Kan Zhang kzh...@apache.org wrote: +1 on the running commentary here, non-binding of course :-) On Sat, May 17, 2014 at 8:44 AM, Andrew Ash and...@andrewash.com wrote: +1 on the next release feeling more like a 0.10 than a 1.0 On May 17, 2014 4:38 AM, Mridul Muralidharan mri...@gmail.com wrote: I had echoed similar sentiments a while back when there was a discussion around 0.10 vs 1.0 ... I would have preferred 0.10 to stabilize the api changes, add missing functionality, go through a hardening release before 1.0 But the community preferred a 1.0 :-) Regards, Mridul On 17-May-2014 3:19 pm, Sean Owen so...@cloudera.com wrote: On this note, non-binding commentary: Releases happen in local minima of change, usually created by internally enforced code freeze. Spark is incredibly busy now due to external factors -- recently a TLP, recently discovered by a large new audience, ease of contribution enabled by Github. It's getting like the first year of mainstream battle-testing in a month. It's been very hard to freeze anything! I see a number of non-trivial issues being reported, and I don't think it has been possible to triage all of them, even. Given the high rate of change, my instinct would have been to release 0.10.0 now. But won't it always be very busy? I do think the rate of significant issues will slow down. Version ain't nothing but a number, but if it has any meaning it's the semantic versioning meaning. 1.0 imposes extra handicaps around striving to maintain backwards-compatibility. That may end up being bent to fit in important changes that are going to be required in this continuing period of change. Hadoop does this all the time unfortunately and gets away with it, I suppose -- minor version releases are really
[jira] [Commented] (SPARK-1849) Broken UTF-8 encoded data gets character replacements and thus can't be fixed
[ https://issues.apache.org/jira/browse/SPARK-1849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14000397#comment-14000397 ] Mridul Muralidharan commented on SPARK-1849: Looks like textFile is probably the wrong api to use. You cannot recover from badly encoded data ... Better would be to write your own InputFormat which does what you need. Broken UTF-8 encoded data gets character replacements and thus can't be fixed --- Key: SPARK-1849 URL: https://issues.apache.org/jira/browse/SPARK-1849 Project: Spark Issue Type: Bug Reporter: Harry Brundage Fix For: 1.0.0, 0.9.1 Attachments: encoding_test I'm trying to process a file which isn't valid UTF-8 data inside hadoop using Spark via {{sc.textFile()}}. Is this possible, and if not, is this a bug that we should fix? It looks like {{HadoopRDD}} uses {{org.apache.hadoop.io.Text.toString}} on all the data it ever reads, which I believe replaces invalid UTF-8 byte sequences with the UTF-8 replacement character, \uFFFD. Some example code mimicking what {{sc.textFile}} does underneath: {code} scala sc.textFile(path).collect()(0) res8: String = ?pple scala sc.hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text]).map(pair = pair._2.toString).collect()(0).getBytes() res9: Array[Byte] = Array(-17, -65, -67, 112, 112, 108, 101) scala sc.hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text]).map(pair = pair._2.getBytes).collect()(0) res10: Array[Byte] = Array(-60, 112, 112, 108, 101) {code} In the above example, the first two snippets show the string representation and byte representation of the example line of text. The third snippet shows what happens if you call {{getBytes}} on the {{Text}} object which comes back from hadoop land: we get the real bytes in the file out. Now, I think this is a bug, though you may disagree. The text inside my file is perfectly valid iso-8859-1 encoded bytes, which I would like to be able to rescue and re-encode into UTF-8, because I want my application to be smart like that. I think Spark should give me the raw broken string so I can re-encode, but I can't get at the original bytes in order to guess at what the source encoding might be, as they have already been replaced. I'm dealing with data from some CDN access logs which are to put it nicely diversely encoded, but I think a use case Spark should fully support. So, my suggested fix, which I'd like some guidance, is to change {{textFile}} to spit out broken strings by not using {{Text}}'s UTF-8 encoding. Further compounding this issue is that my application is actually in PySpark, but we can talk about how bytes fly through to Scala land after this if we agree that this is an issue at all. -- This message was sent by Atlassian JIRA (v6.2#6252)
Re: [jira] [Created] (SPARK-1855) Provide memory-and-local-disk RDD checkpointing
Effectively this is persist without fault tolerance. Failure of any node means complete lack of fault tolerance. I would be very skeptical of truncating lineage if it is not reliable. On 17-May-2014 3:49 am, Xiangrui Meng (JIRA) j...@apache.org wrote: Xiangrui Meng created SPARK-1855: Summary: Provide memory-and-local-disk RDD checkpointing Key: SPARK-1855 URL: https://issues.apache.org/jira/browse/SPARK-1855 Project: Spark Issue Type: New Feature Components: MLlib, Spark Core Affects Versions: 1.0.0 Reporter: Xiangrui Meng Checkpointing is used to cut long lineage while maintaining fault tolerance. The current implementation is HDFS-based. Using the BlockRDD we can create in-memory-and-local-disk (with replication) checkpoints that are not as reliable as HDFS-based solution but faster. It can help applications that require many iterations. -- This message was sent by Atlassian JIRA (v6.2#6252)
Re: [VOTE] Release Apache Spark 1.0.0 (rc6)
So was rc5 cancelled ? Did not see a note indicating that or why ... [1] - Mridul [1] could have easily missed it in the email storm though ! On Thu, May 15, 2014 at 1:32 AM, Patrick Wendell pwend...@gmail.com wrote: Please vote on releasing the following candidate as Apache Spark version 1.0.0! This patch has a few minor fixes on top of rc5. I've also built the binary artifacts with Hive support enabled so people can test this configuration. When we release 1.0 we might just release both vanilla and Hive-enabled binaries. The tag to be voted on is v1.0.0-rc6 (commit 54133a): https://git-wip-us.apache.org/repos/asf?p=spark.git;a=commit;h=54133abdce0246f6643a1112a5204afb2c4caa82 The release files, including signatures, digests, etc. can be found at: http://people.apache.org/~pwendell/spark-1.0.0-rc6/ Release artifacts are signed with the following key: https://people.apache.org/keys/committer/pwendell.asc The staging repository for this release can be found at: https://repository.apache.org/content/repositories/orgapachestratos-1011 The documentation corresponding to this release can be found at: http://people.apache.org/~pwendell/spark-1.0.0-rc6-docs/ Please vote on releasing this package as Apache Spark 1.0.0! The vote is open until Saturday, May 17, at 20:58 UTC and passes if amajority of at least 3 +1 PMC votes are cast. [ ] +1 Release this package as Apache Spark 1.0.0 [ ] -1 Do not release this package because ... To learn more about Apache Spark, please see http://spark.apache.org/ == API Changes == We welcome users to compile Spark applications against 1.0. There are a few API changes in this release. Here are links to the associated upgrade guides - user facing changes have been kept as small as possible. changes to ML vector specification: http://people.apache.org/~pwendell/spark-1.0.0-rc5-docs/mllib-guide.html#from-09-to-10 changes to the Java API: http://people.apache.org/~pwendell/spark-1.0.0-rc5-docs/java-programming-guide.html#upgrading-from-pre-10-versions-of-spark changes to the streaming API: http://people.apache.org/~pwendell/spark-1.0.0-rc5-docs/streaming-programming-guide.html#migration-guide-from-091-or-below-to-1x changes to the GraphX API: http://people.apache.org/~pwendell/spark-1.0.0-rc5-docs/graphx-programming-guide.html#upgrade-guide-from-spark-091 coGroup and related functions now return Iterable[T] instead of Seq[T] == Call toSeq on the result to restore the old behavior SparkContext.jarOfClass returns Option[String] instead of Seq[String] == Call toSeq on the result to restore old behavior
Re: [jira] [Created] (SPARK-1767) Prefer HDFS-cached replicas when scheduling data-local tasks
Hi Sandy, I assume you are referring to caching added to datanodes via new caching api via NN ? (To preemptively mmap blocks). I have not looked in detail, but does NN tell us about this in block locations? If yes, we can simply make those process local instead of node local for executors on that node. This would simply be a change to hadoop based rdd partitioning (what makes it tricky is to expose currently 'alive' executors to partition) Thanks Mridul On 15-May-2014 3:49 am, Sandy Ryza (JIRA) j...@apache.org wrote: Sandy Ryza created SPARK-1767: - Summary: Prefer HDFS-cached replicas when scheduling data-local tasks Key: SPARK-1767 URL: https://issues.apache.org/jira/browse/SPARK-1767 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 1.0.0 Reporter: Sandy Ryza -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-1813) Add a utility to SparkConf that makes using Kryo really easy
[ https://issues.apache.org/jira/browse/SPARK-1813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13996390#comment-13996390 ] Mridul Muralidharan commented on SPARK-1813: Writing a KryoRegistrator is the only requirement - rest are done as part of initialization anyway. Registering classes with kryo is non trivial except for degenerate cases : for example, we have classes we have to use java read/write Object serialization, which support kyro serialization, which support java's external serialization, generated classes, etc. And we would need a registrator ... ofcourse, it could be argued this is corner case, though I dont think so. Add a utility to SparkConf that makes using Kryo really easy Key: SPARK-1813 URL: https://issues.apache.org/jira/browse/SPARK-1813 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 1.0.0 Reporter: Sandy Ryza It would be nice to have a method in SparkConf that makes it really easy to use Kryo and register a set of classes. without defining you Using Kryo currently requires all this: {code} import com.esotericsoftware.kryo.Kryo import org.apache.spark.serializer.KryoRegistrator class MyRegistrator extends KryoRegistrator { override def registerClasses(kryo: Kryo) { kryo.register(classOf[MyClass1]) kryo.register(classOf[MyClass2]) } } val conf = new SparkConf().setMaster(...).setAppName(...) conf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer) conf.set(spark.kryo.registrator, mypackage.MyRegistrator) val sc = new SparkContext(conf) {code} It would be nice if it just required this: {code} SparkConf.setKryo(Array(classOf[MyFirstClass, classOf[MySecond])) {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
Re: bug using kryo as closure serializer
On a slightly related note (apologies Soren for hijacking the thread), Reynold how much better is kryo from spark's usage point of view compared to the default java serialization (in general, not for closures) ? The numbers on kyro site are interesting, but since you have played the most with kryo in context of spark (i think) - how do you rate it along lines of : 1) computational overhead compared to java serialization. 2) memory overhead. 3) generated byte[] size. Particularly given the bugs Patrick and I had looked into in past along flush, etc I was always skeptical about using kyro. But given the pretty nasty issues with OOM's via java serialization we are seeing, wanted to know your thoughts on use of kyro with spark. (Will be slightly involved to ensure everything gets registered, but I want to go down the path assuming I hear good things in context of spark) Thanks, Mridul On Mon, May 5, 2014 at 1:20 AM, Reynold Xin r...@databricks.com wrote: I added the config option to use the non-default serializer. However, at the time, Kryo fails serializing pretty much any closures so that option was never really used / recommended. Since then the Scala ecosystem has developed, and some other projects are starting to use Kryo to serialize more Scala data structures, so I wouldn't be surprised if there is a way to work around this now. However, I don't have enough time to look into it at this point. If you do, please do post your findings. Thanks. On Sun, May 4, 2014 at 10:35 AM, Soren Macbeth so...@yieldbot.com wrote: apologies for the cross-list posts, but I've gotten zero response in the user list and I guess this list is probably more appropriate. According to the documentation, using the KryoSerializer for closures is supported. However, when I try to set `spark.closure.serializer` to `org.apache.spark.serializer.KryoSerializer` thing fail pretty miserably. The first thing that happens it that is throws exceptions over and over that it cannot locate my registrator class, which is located in my assembly jar like so: 14/05/04 12:03:20 ERROR serializer.KryoSerializer: Failed to run spark.kryo.registrator java.lang.ClassNotFoundException: pickles.kryo.PicklesRegistrator at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:270) at org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$2.apply(KryoSerializer.scala:63) at org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$2.apply(KryoSerializer.scala:61) at scala.Option.foreach(Option.scala:236) at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:61) at org.apache.spark.serializer.KryoSerializerInstance.init(KryoSerializer.scala:116) at org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:79) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:180) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1438) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:724) Now, I would expect it not to be able to find this class since it hasn't yet fetched my assembly jar to the executors. Once it does fetch my jar, those expections stop. Next, all the executor task die with the following exception: java.nio.ReadOnlyBufferException at java.nio.ByteBuffer.array(ByteBuffer.java:961) at org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:136) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:193) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1438) at
[jira] [Commented] (SPARK-1606) spark-submit needs `--arg` for every application parameter
[ https://issues.apache.org/jira/browse/SPARK-1606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13988756#comment-13988756 ] Mridul Muralidharan commented on SPARK-1606: Crap, got to this too late. We really should not have added this - and I would have -1'ed it. We have had too many issues with trying to parse the user command line and getting into all sorts of issues which are entirely avoidable by simply being explicit about what the user wants to pass. Trying to pass strings which contain escape characters and/or whitespace, etc is just going to be a nightmare about this change. spark-submit needs `--arg` for every application parameter -- Key: SPARK-1606 URL: https://issues.apache.org/jira/browse/SPARK-1606 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 1.0.0 Reporter: Xiangrui Meng Assignee: Patrick Wendell Priority: Blocker Fix For: 1.0.0 If the application has a few parameters, the spark-submit command looks like the following: {code} spark-submit --master yarn-cluster --class main.Class --arg --numPartitions --arg 8 --arg --kryo --arg true {code} It is a little bit hard to read and modify. Maybe it is okay to treat all arguments after `main.Class` as application parameters. {code} spark-submit --master yarn-cluster --class main.Class --numPartitions 8 --kryo true {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-1706) Allow multiple executors per worker in Standalone mode
[ https://issues.apache.org/jira/browse/SPARK-1706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13988868#comment-13988868 ] Mridul Muralidharan commented on SPARK-1706: Oh my, this was supposed to be logical addition once yarn changes were done. Yarn changes were very heavily modelled on standalone mode (hence why yarn-standalone !) : and it was supposed to be a two way street : changes made for yarn support (multi-tennancy, etc) was supposed to have been added back to standalone mode when yarn support stabilized. Did not realize I never got around to it - my apologies ! Allow multiple executors per worker in Standalone mode -- Key: SPARK-1706 URL: https://issues.apache.org/jira/browse/SPARK-1706 Project: Spark Issue Type: Improvement Components: Deploy Reporter: Patrick Wendell Fix For: 1.1.0 Right now if people want to launch multiple executors on each machine they need to start multiple standalone workers. This is not too difficult, but it means you have extra JVM's sitting around. We should just allow users to set a number of cores they want per-executor in standalone mode and then allow packing multiple executors on each node. This would make standalone mode more consistent with YARN in the way you request resources. It's not too big of a change as far as I can see. You'd need to: 1. Introduce a configuration for how many cores you want per executor. 2. Change the scheduling logic in Master.scala to take this into account. 3. Change CoarseGrainedSchedulerBackend to not assume a 1-1 correspondence between hosts and executors. And maybe modify a few other places. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-1576) Passing of JAVA_OPTS to YARN on command line
[ https://issues.apache.org/jira/browse/SPARK-1576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13981313#comment-13981313 ] Mridul Muralidharan commented on SPARK-1576: There is a misunderstanding here - it is to pass SPARK_JAVA_OPTS : not JAVA_OPTS. Directly passing JAVA_OPTS has beem removed Passing of JAVA_OPTS to YARN on command line Key: SPARK-1576 URL: https://issues.apache.org/jira/browse/SPARK-1576 Project: Spark Issue Type: Improvement Affects Versions: 0.9.0, 1.0.0, 0.9.1 Reporter: Nishkam Ravi Fix For: 0.9.0, 1.0.0, 0.9.1 Attachments: SPARK-1576.patch JAVA_OPTS can be passed by using either env variables (i.e., SPARK_JAVA_OPTS) or as config vars (after Patrick's recent change). It would be good to allow the user to pass them on command line as well to restrict scope to single application invocation. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-1586) Fix issues with spark development under windows
[ https://issues.apache.org/jira/browse/SPARK-1586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13981321#comment-13981321 ] Mridul Muralidharan commented on SPARK-1586: Immediate issues fixed though there are more hive tests failing due to path related issues. pr : https://github.com/apache/spark/pull/505 Fix issues with spark development under windows --- Key: SPARK-1586 URL: https://issues.apache.org/jira/browse/SPARK-1586 Project: Spark Issue Type: Bug Reporter: Mridul Muralidharan Assignee: Mridul Muralidharan Fix For: 1.0.0 -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Resolved] (SPARK-1587) Fix thread leak in spark
[ https://issues.apache.org/jira/browse/SPARK-1587?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mridul Muralidharan resolved SPARK-1587. Resolution: Fixed Fixed, https://github.com/apache/spark/pull/504 Fix thread leak in spark Key: SPARK-1587 URL: https://issues.apache.org/jira/browse/SPARK-1587 Project: Spark Issue Type: Bug Reporter: Mridul Muralidharan Assignee: Mridul Muralidharan SparkContext.stop does not cause all threads to exit. When running tests via scalatest (which keeps reusing the same vm), over time, this causes too many threads to be created causing tests to fail due to inability to create more threads. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (BOOKKEEPER-560) Create readme for hedwig-client-jms
[ https://issues.apache.org/jira/browse/BOOKKEEPER-560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mridul Muralidharan updated BOOKKEEPER-560: --- Assignee: (was: Mridul Muralidharan) Create readme for hedwig-client-jms Key: BOOKKEEPER-560 URL: https://issues.apache.org/jira/browse/BOOKKEEPER-560 Project: Bookkeeper Issue Type: Task Components: Documentation Reporter: Ivan Kelly This module needs a readme describing it as an experimental component and what parts of jms are supported and not supported. It would also be good to have a bit more detail on why they can't be supported with hedwig as it is today. A lot of this can be taken verbatim from the package-info.html -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (BOOKKEEPER-648) BasicJMSTest failed
[ https://issues.apache.org/jira/browse/BOOKKEEPER-648?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mridul Muralidharan updated BOOKKEEPER-648: --- Assignee: (was: Mridul Muralidharan) BasicJMSTest failed --- Key: BOOKKEEPER-648 URL: https://issues.apache.org/jira/browse/BOOKKEEPER-648 Project: Bookkeeper Issue Type: Bug Components: hedwig-client Reporter: Flavio Junqueira While running tests, I got once a failure for this hedwig-client-jms test: BasicJMSTest. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-1588) SPARK_JAVA_OPTS is not getting propagated
[ https://issues.apache.org/jira/browse/SPARK-1588?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13978827#comment-13978827 ] Mridul Muralidharan commented on SPARK-1588: Apparently, SPARK_YARN_USER_ENV is also broken SPARK_JAVA_OPTS is not getting propagated - Key: SPARK-1588 URL: https://issues.apache.org/jira/browse/SPARK-1588 Project: Spark Issue Type: Bug Components: YARN Affects Versions: 1.0.0 Reporter: Mridul Muralidharan Priority: Blocker We could use SPARK_JAVA_OPTS to pass JAVA_OPTS to be used in the master. This is no longer working in current master. -- This message was sent by Atlassian JIRA (v6.2#6252)
Re: all values for a key must fit in memory
An iterator does not imply data has to be memory resident. Think merge sort output as an iterator (disk backed). Tom is actually planning to work on something similar with me on this hopefully this or next month. Regards, Mridul On Sun, Apr 20, 2014 at 11:46 PM, Sandy Ryza sandy.r...@cloudera.com wrote: Hey all, After a shuffle / groupByKey, Hadoop MapReduce allows the values for a key to not all fit in memory. The current ShuffleFetcher.fetch API, which doesn't distinguish between keys and values, only returning an Iterator[P], seems incompatible with this. Any thoughts on how we could achieve parity here? -Sandy
[jira] [Commented] (SPARK-1524) TaskSetManager'd better not schedule tasks which has no preferred executorId using PROCESS_LOCAL in the first search process
[ https://issues.apache.org/jira/browse/SPARK-1524?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13972864#comment-13972864 ] Mridul Muralidharan commented on SPARK-1524: The expectation is to fallback to a previous schedule type in case the higher level is not valid : though this is tricky in general case. Will need to take a look at it - though given that I am tied up with other things, if someone else wants to take a crack, please feel free to do so ! Btw, use of IP's and multiple hostnames for a host is not supported in spark - so that is something that will need to be resolved at the deployment end. TaskSetManager'd better not schedule tasks which has no preferred executorId using PROCESS_LOCAL in the first search process Key: SPARK-1524 URL: https://issues.apache.org/jira/browse/SPARK-1524 Project: Spark Issue Type: Improvement Components: Spark Core Reporter: YanTang Zhai Priority: Minor ShuffleMapTask is constructed with TaskLocation which has only host not (host, executorID) pair in DAGScheduler. When TaskSetManager schedules ShuffleMapTask which has no preferred executorId using specific execId host and PROCESS_LOCAL locality level, no tasks match the given locality constraint in the first search process. We also find that the host used by Scheduler is hostname while the host used by TaskLocation is IP in our cluster. The tow hosts do not match, that makes pendingTasksForHost HashMap empty and the finding task process against our expectation. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-1476) 2GB limit in spark for blocks
[ https://issues.apache.org/jira/browse/SPARK-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13972978#comment-13972978 ] Mridul Muralidharan commented on SPARK-1476: [~matei] We are having some issue porting the netty shuffle copier code to support 2G since only ByteBuf seems to be exposed. Before I dig into netty more, wanted to know if you or someone else from among spark developers knew how to add support for large buffers in our netty code. Thanks ! 2GB limit in spark for blocks - Key: SPARK-1476 URL: https://issues.apache.org/jira/browse/SPARK-1476 Project: Spark Issue Type: Bug Components: Spark Core Environment: all Reporter: Mridul Muralidharan Assignee: Mridul Muralidharan Priority: Critical Fix For: 1.1.0 The underlying abstraction for blocks in spark is a ByteBuffer : which limits the size of the block to 2GB. This has implication not just for managed blocks in use, but also for shuffle blocks (memory mapped blocks are limited to 2gig, even though the api allows for long), ser-deser via byte array backed outstreams (SPARK-1391), etc. This is a severe limitation for use of spark when used on non trivial datasets. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-1453) Improve the way Spark on Yarn waits for executors before starting
[ https://issues.apache.org/jira/browse/SPARK-1453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13968390#comment-13968390 ] Mridul Muralidharan commented on SPARK-1453: (d) becomes relevant in case of headless/cron'ed jobs. If the job is user initiated, then I agree, the user would typically kill and restart the job. Improve the way Spark on Yarn waits for executors before starting - Key: SPARK-1453 URL: https://issues.apache.org/jira/browse/SPARK-1453 Project: Spark Issue Type: Improvement Components: YARN Affects Versions: 1.0.0 Reporter: Thomas Graves Assignee: Thomas Graves Currently Spark on Yarn just delays a few seconds between when the spark context is initialized and when it allows the job to start. If you are on a busy hadoop cluster is might take longer to get the number of executors. In the very least we could make this timeout a configurable value. Its currently hardcoded to 3 seconds. Better yet would be to allow user to give a minimum number of executors it wants to wait for, but that looks much more complex. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Comment Edited] (SPARK-1476) 2GB limit in spark for blocks
[ https://issues.apache.org/jira/browse/SPARK-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13967854#comment-13967854 ] Mridul Muralidharan edited comment on SPARK-1476 at 4/13/14 2:45 PM: - There are multiple issues at play here : a) If a block goes beyond 2G, everything fails - this is the case for shuffle, cached and 'normal' blocks. Irrespective of storage level and/or other config. In a lot of cases, particularly when data generated 'increases' after a map/flatMap/etc, the user has no control over the size increase in the output block for a given input block (think iterations over datasets). b) Increasing number of partitions is not always an option (for the subset of usecases where it can be done) : 1) It has an impact on number of intermediate files created while doing a shuffle. (ulimit constraints, IO performance issues, etc). 2) It does not help when there is skew anyway. c) Compression codec, serializer used, etc have an impact. d) 2G is extremely low limit to have in modern hardware : and this is particularly a severe limitation when we have nodes running on 32G to 64G ram and TB's of disk space available for spark. To address specific points raised above : A) [~pwendell] Mapreduce jobs dont fail in case the block size of files increases - it might be inefficient, it still runs (not that I know of any case where it does actually, but theoretically I guess it can become inefficient). So analogy does not apply. Also to add, 2G is not really an unlimited increase in block size - and in MR, output of a map can easily go a couple of orders above 2G : whether it is followed by a reduce or not. B) [~matei] In the specific cases it was failing, the users were not caching the data but directly going to shuffle. There was no skew from what I see : just the data size per key is high; and there are a lot of keys too btw (as iterations increase and nnz increases). Note that it was an impl detail that it was not being cached - it could have been too. Additionally, compression and/or serialization also apply implicitly in this case, since it was impacting shuffle - the 2G limit was observed at both the map and reduce side (in two different jobs). In general, our effort is to make spark as a drop in replacement for most usecases which are currently being done via MR/Pig/etc. Limitations of this sort make it difficult to position spark as a credible alternative. Current approach we are exploring is to remove all direct references to ByteBuffer from spark (except for ConnectionManager, etc parts); and rely on a BlockData or similar datastructure which encapsulate the data corresponding to a block. By default, a single ByteBuffer should suffice but in case it does not, the class will automatically take care of splitting across blocks. Similarly, all references to byte array backed streams will need to be replaced with a wrapper stream which multiplexes over byte array streams. The performance impact for all 'normal' usecases should be the minimal, while allowing for spark to be used in cases where 2G limit is being hit. The only unknown here is tachyon integration : where the interface is a ByteBuffer - and I am not knowledgable enough to comment on what the issues there would be. was (Author: mridulm80): There are multiple issues at play here : a) If a block goes beyond 2G, everything fails - this is the case for shuffle, cached and 'normal' blocks. Irrespective of storage level and/or other config. In a lot of cases, particularly when data generated 'increases' after a map/flatMap/etc, the user has no control over the size increase in the output block for a given input block (think iterations over datasets). b) Increasing number of partitions is not always an option (for the subset of usecases where it can be done) : 1) It has an impact on number of intermediate files created while doing a shuffle. (ulimit constraints, IO performance issues, etc). 2) It does not help when there is skew anyway. c) Compression codec, serializer used, etc have an impact. d) 2G is extremely low limit to have in modern hardware : and this is practically a severe limitation when we have nodes running on 32G to 64G ram and TB's of disk space available for spark. To address specific points raised above : A) [~pwendell] Mapreduce jobs dont fail in case the block size of files increases - it might be inefficient, it still runs (not that I know of any case where it does actually, but theoretically I guess it can). So analogy does not apply. To add, 2G is not really an unlimited increase in block size - and in MR, output of a map can easily go a couple of orders above 2G. B) [~matei] In the specific cases it was failing, the users were not caching the data but directly going to shuffle. There was no skew from what we see : just the data size per key is high
[jira] [Commented] (SPARK-1476) 2GB limit in spark for blocks
[ https://issues.apache.org/jira/browse/SPARK-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13967419#comment-13967419 ] Mridul Muralidharan commented on SPARK-1476: WIP Proposal: - All references to ByteBuffer will need to be replaced with Seq[ByteBuffer]. This applies to definition of a block, memory mapped file segments for a shuffle block, etc. - All use of byte array backed outputstream will need to be replaced with a aggregating outputstream which writes to multiple boas as and when array limits are hit. 2GB limit in spark for blocks - Key: SPARK-1476 URL: https://issues.apache.org/jira/browse/SPARK-1476 Project: Spark Issue Type: Bug Components: Spark Core Environment: all Reporter: Mridul Muralidharan Priority: Critical The underlying abstraction for blocks in spark is a ByteBuffer : which limits the size of the block to 2GB. This has implication not just for managed blocks in use, but also for shuffle blocks (memory mapped blocks are limited to 2gig, even though the api allows for long), ser-deser via byte array backed outstreams (SPARK-1391), etc. This is a severe limitation for use of spark when used on non trivial datasets. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-1391) BlockManager cannot transfer blocks larger than 2G in size
[ https://issues.apache.org/jira/browse/SPARK-1391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13966332#comment-13966332 ] Mridul Muralidharan commented on SPARK-1391: Another place where this is relevant is here : java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:789) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:98) at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:413) at org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:339) at org.apache.spark.storage.BlockManager.get(BlockManager.scala:506) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:39) at org.apache.spark.rdd.RDD.iterator(RDD.scala:233) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:52) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:43) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1262) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:42) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:722) So we might want to change the abstraction from single ByteBuffer to a sequence of bytebuffers ... BlockManager cannot transfer blocks larger than 2G in size -- Key: SPARK-1391 URL: https://issues.apache.org/jira/browse/SPARK-1391 Project: Spark Issue Type: Bug Components: Block Manager, Shuffle Affects Versions: 1.0.0 Reporter: Shivaram Venkataraman Assignee: Min Zhou Attachments: SPARK-1391.diff If a task tries to remotely access a cached RDD block, I get an exception when the block size is 2G. The exception is pasted below. Memory capacities are huge these days ( 60G), and many workflows depend on having large blocks in memory, so it would be good to fix this bug. I don't know if the same thing happens on shuffles if one transfer (from mapper to reducer) is 2G. {noformat} 14/04/02 02:33:10 ERROR storage.BlockManagerWorker: Exception handling buffer message java.lang.ArrayIndexOutOfBoundsException at it.unimi.dsi.fastutil.io.FastByteArrayOutputStream.write(FastByteArrayOutputStream.java:96) at it.unimi.dsi.fastutil.io.FastBufferedOutputStream.dumpBuffer(FastBufferedOutputStream.java:134) at it.unimi.dsi.fastutil.io.FastBufferedOutputStream.write(FastBufferedOutputStream.java:164) at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:38) at org.apache.spark.serializer.SerializationStream$class.writeAll(Serializer.scala:93) at org.apache.spark.serializer.JavaSerializationStream.writeAll(JavaSerializer.scala:26) at org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:913) at org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:922) at org.apache.spark.storage.MemoryStore.getBytes(MemoryStore.scala:102) at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:348) at org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:323) at org.apache.spark.storage.BlockManagerWorker.getBlock(BlockManagerWorker.scala:90) at org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:69) at org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) at org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) at scala.collection.TraversableLike$$anonfun$map$1.apply
[jira] [Commented] (SPARK-542) Cache Miss when machine have multiple hostname
[ https://issues.apache.org/jira/browse/SPARK-542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13967185#comment-13967185 ] Mridul Muralidharan commented on SPARK-542: --- Spark uses only hostnames - not ip's. Even for hostnames, it should ideally pick only the canonical hostname - not the others. This was done by design in 0.8 ... try to find if multiple host names/ip's are all referring to the same physical host/container is fraught with too many issues. Cache Miss when machine have multiple hostname -- Key: SPARK-542 URL: https://issues.apache.org/jira/browse/SPARK-542 Project: Spark Issue Type: Bug Reporter: frankvictor HI, I encountered a weird runtime of pagerank in last few day. After debugging the job, I found it was caused by the DNS name. The machines of my cluster have multiple hostname, for example, slave 1 have name (c001 and c001.cm.cluster) when spark adding cache in cacheTracker, it get c001 and add cache use it. But when schedule task in SimpleJob, the msos offer give spark c001.cm.cluster. so It will never get preferred location! I thinks spark should handle the multiple hostname case(by using ip instead of hostname, or some other methods). Thanks! -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-1453) Improve the way Spark on Yarn waits for executors before starting
[ https://issues.apache.org/jira/browse/SPARK-1453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13967193#comment-13967193 ] Mridul Muralidharan commented on SPARK-1453: The timeout gets hit only when we dont get requested executors, right ? So it is more like max timeout (controlled by number of times we loop iirc). The reason for keeping it stupid was simply because we have no gaurantees of number of containers which might be available to spark in a busy cluster : at times, it might not be practically possible to even get a fraction of the requested nodes (either due to busy cluster, or because of lack of resources - so infinite wait). Ideally, I should have exposed the number of containers allocated - so that atleast user code could use it as spi and decide how to proceed for more complex cases. Missed out on this one. I am not sure which usecases make sense. a) Wait for X seconds or requested containers allocated. b) Wait until minimum of Y containers allocated (out of X requested). c) (b) with (a) - that is min containers and timeout on that. d) (c) with exit if min containers not allocated ? (d) is something which I keep hitting into (if I dont get my required minimum nodes, and job proceeds, I usually end up bringing down those nodes :-( ) Improve the way Spark on Yarn waits for executors before starting - Key: SPARK-1453 URL: https://issues.apache.org/jira/browse/SPARK-1453 Project: Spark Issue Type: Improvement Components: YARN Affects Versions: 1.0.0 Reporter: Thomas Graves Assignee: Thomas Graves Currently Spark on Yarn just delays a few seconds between when the spark context is initialized and when it allows the job to start. If you are on a busy hadoop cluster is might take longer to get the number of executors. In the very least we could make this timeout a configurable value. Its currently hardcoded to 3 seconds. Better yet would be to allow user to give a minimum number of executors it wants to wait for, but that looks much more complex. -- This message was sent by Atlassian JIRA (v6.2#6252)
ephemeral storage level in spark ?
Hi, We have a requirement to use a (potential) ephemeral storage, which is not within the VM, which is strongly tied to a worker node. So source of truth for a block would still be within spark; but to actually do computation, we would need to copy data to external device (where it might lie around for a while : so data locality really really helps if we can avoid a subsequent copy if it is already present on computations on same block again). I was wondering if the recently added storage level for tachyon would help in this case (note, tachyon wont help; just the storage level might). What sort of guarantees does it provide ? How extensible is it ? Or is it strongly tied to tachyon with only a generic name ? Thanks, Mridul
Re: ephemeral storage level in spark ?
No, I am thinking along lines of writing to an accelerator card or dedicated card with its own memory. Regards, Mridul On Apr 6, 2014 5:19 AM, Haoyuan Li haoyuan...@gmail.com wrote: Hi Mridul, Do you mean the scenario that different Spark applications need to read the same raw data, which is stored in a remote cluster or machines. And the goal is to load the remote raw data only once? Haoyuan On Sat, Apr 5, 2014 at 4:30 PM, Mridul Muralidharan mri...@gmail.com wrote: Hi, We have a requirement to use a (potential) ephemeral storage, which is not within the VM, which is strongly tied to a worker node. So source of truth for a block would still be within spark; but to actually do computation, we would need to copy data to external device (where it might lie around for a while : so data locality really really helps if we can avoid a subsequent copy if it is already present on computations on same block again). I was wondering if the recently added storage level for tachyon would help in this case (note, tachyon wont help; just the storage level might). What sort of guarantees does it provide ? How extensible is it ? Or is it strongly tied to tachyon with only a generic name ? Thanks, Mridul -- Haoyuan Li Algorithms, Machines, People Lab, EECS, UC Berkeley http://www.cs.berkeley.edu/~haoyuan/
JIRA. github and asf updates
Hi, So we are now receiving updates from three sources for each change to the PR. While each of them handles a corner case which others might miss, would be great if we could minimize the volume of duplicated communication. Regards, Mridul
Re: JIRA. github and asf updates
If the PR comments are going to be replicated into the jira's and they are going to be set to dev@, then we could keep that and remove [Github] updates ? The last was added since discussions were happening off apache lists - which should be handled by the jira updates ? I dont mind the mails if they had content - this is just duplication of the same message in three mails :-) Btw, this is a good problem to have - a vibrant and very actively engaged community generated a lot of meaningful traffic ! I just dont want to get distracted from it by repetitions. Regards, Mridul On Sat, Mar 29, 2014 at 11:46 PM, Patrick Wendell pwend...@gmail.com wrote: Ah sorry I see - Jira updates are going to the dev list. Maybe that's not desirable. I think we should send them to the issues@ list. On Sat, Mar 29, 2014 at 11:16 AM, Patrick Wendell pwend...@gmail.comwrote: Mridul, You can unsubscribe yourself from any of these sources, right? - Patrick On Sat, Mar 29, 2014 at 11:05 AM, Mridul Muralidharan mri...@gmail.comwrote: Hi, So we are now receiving updates from three sources for each change to the PR. While each of them handles a corner case which others might miss, would be great if we could minimize the volume of duplicated communication. Regards, Mridul
Re: Spark 0.9.1 release
On Wed, Mar 26, 2014 at 10:53 AM, Tathagata Das tathagata.das1...@gmail.com wrote: PR 159 seems like a fairly big patch to me. And quite recent, so its impact on the scheduling is not clear. It may also depend on other changes that may have gotten into the DAGScheduler but not pulled into branch 0.9. I am not sure it is a good idea to pull that in. We can pull those changes later for 0.9.2 if required. There is no impact on scheduling : it only has an impact on error handling - it ensures that you can actually use spark on yarn in multi-tennent clusters more reliably. Currently, any reasonably long running job (30 mins+) working on non trivial dataset will fail due to accumulated failures in spark. Regards, Mridul TD On Tue, Mar 25, 2014 at 8:44 PM, Mridul Muralidharan mri...@gmail.comwrote: Forgot to mention this in the earlier request for PR's. If there is another RC being cut, please add https://github.com/apache/spark/pull/159 to it too (if not done already !). Thanks, Mridul On Thu, Mar 20, 2014 at 5:37 AM, Tathagata Das tathagata.das1...@gmail.com wrote: Hello everyone, Since the release of Spark 0.9, we have received a number of important bug fixes and we would like to make a bug-fix release of Spark 0.9.1. We are going to cut a release candidate soon and we would love it if people test it out. We have backported several bug fixes into the 0.9 and updated JIRA accordingly https://spark-project.atlassian.net/browse/SPARK-1275?jql=project%20in%20(SPARK%2C%20BLINKDB%2C%20MLI%2C%20MLLIB%2C%20SHARK%2C%20STREAMING%2C%20GRAPH%2C%20TACHYON)%20AND%20fixVersion%20%3D%200.9.1%20AND%20status%20in%20(Resolved%2C%20Closed) . Please let me know if there are fixes that were not backported but you would like to see them in 0.9.1. Thanks! TD
Re: Spark 0.9.1 release
Would be great if the garbage collection PR is also committed - if not the whole thing, atleast the part to unpersist broadcast variables explicitly would be great. Currently we are running with a custom impl which does something similar, and I would like to move to standard distribution for that. Thanks, Mridul On Wed, Mar 19, 2014 at 5:07 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Hello everyone, Since the release of Spark 0.9, we have received a number of important bug fixes and we would like to make a bug-fix release of Spark 0.9.1. We are going to cut a release candidate soon and we would love it if people test it out. We have backported several bug fixes into the 0.9 and updated JIRA accordinglyhttps://spark-project.atlassian.net/browse/SPARK-1275?jql=project%20in%20(SPARK%2C%20BLINKDB%2C%20MLI%2C%20MLLIB%2C%20SHARK%2C%20STREAMING%2C%20GRAPH%2C%20TACHYON)%20AND%20fixVersion%20%3D%200.9.1%20AND%20status%20in%20(Resolved%2C%20Closed). Please let me know if there are fixes that were not backported but you would like to see them in 0.9.1. Thanks! TD
Re: Spark 0.9.1 release
If 1.0 is just round the corner, then it is fair enough to push to that, thanks for clarifying ! Regards, Mridul On Wed, Mar 19, 2014 at 6:12 PM, Tathagata Das tathagata.das1...@gmail.com wrote: I agree that the garbage collection PRhttps://github.com/apache/spark/pull/126would make things very convenient in a lot of usecases. However, there are two broads reasons why it is hard for that PR to get into 0.9.1. 1. The PR still needs some amount of work and quite a lot of testing. While we enable RDD and shuffle cleanup based on Java GC, its behavior in a real workloads still needs to be understood (especially since it is tied to Spark driver's garbage collection behavior). 2. This actually changes some of the semantic behavior of Spark and should not be included in a bug-fix release. The PR will definitely be present for Spark 1.0, which is expected to be release around end of April (not too far ;) ). TD On Wed, Mar 19, 2014 at 5:57 PM, Mridul Muralidharan mri...@gmail.comwrote: Would be great if the garbage collection PR is also committed - if not the whole thing, atleast the part to unpersist broadcast variables explicitly would be great. Currently we are running with a custom impl which does something similar, and I would like to move to standard distribution for that. Thanks, Mridul On Wed, Mar 19, 2014 at 5:07 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Hello everyone, Since the release of Spark 0.9, we have received a number of important bug fixes and we would like to make a bug-fix release of Spark 0.9.1. We are going to cut a release candidate soon and we would love it if people test it out. We have backported several bug fixes into the 0.9 and updated JIRA accordingly https://spark-project.atlassian.net/browse/SPARK-1275?jql=project%20in%20(SPARK%2C%20BLINKDB%2C%20MLI%2C%20MLLIB%2C%20SHARK%2C%20STREAMING%2C%20GRAPH%2C%20TACHYON)%20AND%20fixVersion%20%3D%200.9.1%20AND%20status%20in%20(Resolved%2C%20Closed) . Please let me know if there are fixes that were not backported but you would like to see them in 0.9.1. Thanks! TD
Re: Fwd: ASF Board Meeting Summary - February 19, 2014
Wonderful news ! Congrats all :-) Regards, Mridul On Feb 20, 2014 10:07 PM, Andy Konwinski andykonwin...@gmail.com wrote: Congrats Spark community! I think this means we are officially now a TLP! -- Forwarded message -- From: Brett Porter chair...@apache.org Date: Feb 19, 2014 11:26 PM Subject: ASF Board Meeting Summary - February 19, 2014 To: committ...@apache.org Cc: The February board meeting took place on the 19th. The following directors were present: Shane Curcuru Bertrand Delacretaz Roy T. Fielding Jim Jagielski Chris Mattmann Brett Porter Greg Stein Apologies were received from Sam Ruby. The following officers were present: Ross Gardler Rich Bowen Craig L Russell The following guests were present: Sean Kelly Daniel Gruno Phil Steitz Jake Farrell Marvin Humphrey David Nalley Noah Slater The January minutes were approved. Minutes will be posted to http://www.apache.org/foundation/records/minutes/ The following reports were not approved and are expected next month: Report from the Apache Lenya Project [Richard Frovarp] The following reports were not received and are expected next month: Report from the Apache Abdera Project [Ant Elder] Report from the Apache Buildr Project [Alex Boisvert] Report from the Apache Click Project [Malcolm Edgar] Report from the Apache Community Development Project [Luciano Resende] Report from the Apache Continuum Project [Brent Atkinson] Report from the Apache Creadur Project [Robert Burrell Donkin] Report from the Apache DirectMemory Project [Raffaele P. Guidi] Report from the Apache Giraph Project [Avery Ching] Report from the Apache Velocity Project [Nathan Bubna] All other reports to the board were approved. The following resolutions were passed unanimously: A. Establish the Apache Open Climate Workbench Project (Michael Joyce, VP) B. Change the Apache Incubator Project Chair (Roman Shaposhnik, VP) C. Establish the Apache Spark Project (Matei Zaharia, VP) D. Establish the Apache Knox Project (Kevin Minder, VP) The next board meeting will be on the 19th of March.
Re: coding style discussion: explicit return type in public APIs
You are right. A degenerate case would be : def createFoo = new FooImpl() vs def createFoo: Foo = new FooImpl() Former will cause api instability. Reynold, maybe this is already avoided - and I understood it wrong ? Thanks, Mridul On Wed, Feb 19, 2014 at 12:44 PM, Christopher Nguyen c...@adatao.com wrote: Mridul, IIUUC, what you've mentioned did come to mind, but I deemed it orthogonal to the stylistic issue Reynold is talking about. I believe you're referring to the case where there is a specific desired return type by API design, but the implementation does not, in which case, of course, one must define the return type. That's an API requirement and not just a matter of readability. We could add this as an NB in the proposed guideline. -- Christopher T. Nguyen Co-founder CEO, Adatao http://adatao.com linkedin.com/in/ctnguyen On Tue, Feb 18, 2014 at 10:40 PM, Reynold Xin r...@databricks.com wrote: +1 Christopher's suggestion. Mridul, How would that happen? Case 3 requires the method to be invoking the constructor directly. It was implicit in my email, but the return type should be the same as the class itself. On Tue, Feb 18, 2014 at 10:37 PM, Mridul Muralidharan mri...@gmail.com wrote: Case 3 can be a potential issue. Current implementation might be returning a concrete class which we might want to change later - making it a type change. The intention might be to return an RDD (for example), but the inferred type might be a subclass of RDD - and future changes will cause signature change. Regards, Mridul On Wed, Feb 19, 2014 at 11:52 AM, Reynold Xin r...@databricks.com wrote: Hi guys, Want to bring to the table this issue to see what other members of the community think and then we can codify it in the Spark coding style guide. The topic is about declaring return types explicitly in public APIs. In general I think we should favor explicit type declaration in public APIs. However, I do think there are 3 cases we can avoid the public API definition because in these 3 cases the types are self-evident repetitive. Case 1. toString Case 2. A method returning a string or a val defining a string def name = abcd // this is so obvious that it is a string val name = edfg // this too Case 3. The method or variable is invoking the constructor of a class and return that immediately. For example: val a = new SparkContext(...) implicit def rddToAsyncRDDActions[T: ClassTag](rdd: RDD[T]) = new AsyncRDDActions(rdd) Thoughts?
Re: coding style discussion: explicit return type in public APIs
Without bikeshedding this too much ... It is likely incorrect (not wrong) - and rules like this potentially cause things to slip through. Explicit return type strictly specifies what is being exposed (think in face of impl change - createFoo changes in future from Foo to Foo1 or Foo2) .. being conservative about how to specify exposed interfaces, imo, outweighs potential gains in breveity of code. Btw this is a degenerate contrieved example already stretching its use ... Regards Mridul Regards Mridul On Feb 19, 2014 1:49 PM, Reynold Xin r...@databricks.com wrote: Yes, the case you brought up is not a matter of readability or style. If it returns a different type, it should be declared (otherwise it is just wrong). On Wed, Feb 19, 2014 at 12:17 AM, Mridul Muralidharan mri...@gmail.com wrote: You are right. A degenerate case would be : def createFoo = new FooImpl() vs def createFoo: Foo = new FooImpl() Former will cause api instability. Reynold, maybe this is already avoided - and I understood it wrong ? Thanks, Mridul On Wed, Feb 19, 2014 at 12:44 PM, Christopher Nguyen c...@adatao.com wrote: Mridul, IIUUC, what you've mentioned did come to mind, but I deemed it orthogonal to the stylistic issue Reynold is talking about. I believe you're referring to the case where there is a specific desired return type by API design, but the implementation does not, in which case, of course, one must define the return type. That's an API requirement and not just a matter of readability. We could add this as an NB in the proposed guideline. -- Christopher T. Nguyen Co-founder CEO, Adatao http://adatao.com linkedin.com/in/ctnguyen On Tue, Feb 18, 2014 at 10:40 PM, Reynold Xin r...@databricks.com wrote: +1 Christopher's suggestion. Mridul, How would that happen? Case 3 requires the method to be invoking the constructor directly. It was implicit in my email, but the return type should be the same as the class itself. On Tue, Feb 18, 2014 at 10:37 PM, Mridul Muralidharan mri...@gmail.com wrote: Case 3 can be a potential issue. Current implementation might be returning a concrete class which we might want to change later - making it a type change. The intention might be to return an RDD (for example), but the inferred type might be a subclass of RDD - and future changes will cause signature change. Regards, Mridul On Wed, Feb 19, 2014 at 11:52 AM, Reynold Xin r...@databricks.com wrote: Hi guys, Want to bring to the table this issue to see what other members of the community think and then we can codify it in the Spark coding style guide. The topic is about declaring return types explicitly in public APIs. In general I think we should favor explicit type declaration in public APIs. However, I do think there are 3 cases we can avoid the public API definition because in these 3 cases the types are self-evident repetitive. Case 1. toString Case 2. A method returning a string or a val defining a string def name = abcd // this is so obvious that it is a string val name = edfg // this too Case 3. The method or variable is invoking the constructor of a class and return that immediately. For example: val a = new SparkContext(...) implicit def rddToAsyncRDDActions[T: ClassTag](rdd: RDD[T]) = new AsyncRDDActions(rdd) Thoughts?
Re: coding style discussion: explicit return type in public APIs
My initial mail had it listed, adding more details here since I assume I am missing something or not being clear - please note, this is just illustrative and my scala knowledge is bad :-) (I am trying to draw parallels from mistakes in java world) def createFoo = new Foo() To def createFoo = new Foo1() To def createFoo = new Foo2() (appropriate inheritance applied - parent Foo). I am thinking from api evolution and binary compatibility point of view Regards, Mridul On Feb 20, 2014 12:12 AM, Reynold Xin r...@databricks.com wrote: Mridul, Can you be more specific in the createFoo example? def myFunc = createFoo is disallowed in my guideline. It is invoking a function createFoo, not the constructor of Foo. On Wed, Feb 19, 2014 at 10:39 AM, Mridul Muralidharan mri...@gmail.com wrote: Without bikeshedding this too much ... It is likely incorrect (not wrong) - and rules like this potentially cause things to slip through. Explicit return type strictly specifies what is being exposed (think in face of impl change - createFoo changes in future from Foo to Foo1 or Foo2) .. being conservative about how to specify exposed interfaces, imo, outweighs potential gains in breveity of code. Btw this is a degenerate contrieved example already stretching its use ... Regards Mridul Regards Mridul On Feb 19, 2014 1:49 PM, Reynold Xin r...@databricks.com wrote: Yes, the case you brought up is not a matter of readability or style. If it returns a different type, it should be declared (otherwise it is just wrong). On Wed, Feb 19, 2014 at 12:17 AM, Mridul Muralidharan mri...@gmail.com wrote: You are right. A degenerate case would be : def createFoo = new FooImpl() vs def createFoo: Foo = new FooImpl() Former will cause api instability. Reynold, maybe this is already avoided - and I understood it wrong ? Thanks, Mridul On Wed, Feb 19, 2014 at 12:44 PM, Christopher Nguyen c...@adatao.com wrote: Mridul, IIUUC, what you've mentioned did come to mind, but I deemed it orthogonal to the stylistic issue Reynold is talking about. I believe you're referring to the case where there is a specific desired return type by API design, but the implementation does not, in which case, of course, one must define the return type. That's an API requirement and not just a matter of readability. We could add this as an NB in the proposed guideline. -- Christopher T. Nguyen Co-founder CEO, Adatao http://adatao.com linkedin.com/in/ctnguyen On Tue, Feb 18, 2014 at 10:40 PM, Reynold Xin r...@databricks.com wrote: +1 Christopher's suggestion. Mridul, How would that happen? Case 3 requires the method to be invoking the constructor directly. It was implicit in my email, but the return type should be the same as the class itself. On Tue, Feb 18, 2014 at 10:37 PM, Mridul Muralidharan mri...@gmail.com wrote: Case 3 can be a potential issue. Current implementation might be returning a concrete class which we might want to change later - making it a type change. The intention might be to return an RDD (for example), but the inferred type might be a subclass of RDD - and future changes will cause signature change. Regards, Mridul On Wed, Feb 19, 2014 at 11:52 AM, Reynold Xin r...@databricks.com wrote: Hi guys, Want to bring to the table this issue to see what other members of the community think and then we can codify it in the Spark coding style guide. The topic is about declaring return types explicitly in public APIs. In general I think we should favor explicit type declaration in public APIs. However, I do think there are 3 cases we can avoid the public API definition because in these 3 cases the types are self-evident repetitive. Case 1. toString Case 2. A method returning a string or a val defining a string def name = abcd // this is so obvious that it is a string val name = edfg // this too Case 3. The method or variable is invoking the constructor of a class and return that immediately. For example: val a = new SparkContext(...) implicit def rddToAsyncRDDActions[T: ClassTag](rdd: RDD[T]) = new AsyncRDDActions(rdd) Thoughts?