[jira] [Commented] (SPARK-13877) Consider removing Kafka modules from Spark / Spark Streaming
[ https://issues.apache.org/jira/browse/SPARK-13877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15200236#comment-15200236 ] Kostas Sakellis commented on SPARK-13877: - Why have we not more seriously considered Spark subprojects? I think it makes more sense than pull this functionality out of Spark completely. It will give these modules their own release trains and have a good separation from core spark while under a software governance model that we all understand. > Consider removing Kafka modules from Spark / Spark Streaming > > > Key: SPARK-13877 > URL: https://issues.apache.org/jira/browse/SPARK-13877 > Project: Spark > Issue Type: Sub-task > Components: Spark Core, Streaming >Affects Versions: 1.6.1 >Reporter: Hari Shreedharan > > Based on the discussion the PR for SPARK-13843 > ([here|https://github.com/apache/spark/pull/11672#issuecomment-196553283]), > we should consider moving the Kafka modules out of Spark as well. > Providing newer functionality (like security) has become painful while > maintaining compatibility with older versions of Kafka. Moving this out > allows more flexibility, allowing users to mix and match Kafka and Spark > versions. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-13877) Consider removing Kafka modules from Spark / Spark Streaming
[ https://issues.apache.org/jira/browse/SPARK-13877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15200252#comment-15200252 ] Kostas Sakellis commented on SPARK-13877: - How is this any different than creating a random repo anywhere else? > Consider removing Kafka modules from Spark / Spark Streaming > > > Key: SPARK-13877 > URL: https://issues.apache.org/jira/browse/SPARK-13877 > Project: Spark > Issue Type: Sub-task > Components: Spark Core, Streaming >Affects Versions: 1.6.1 >Reporter: Hari Shreedharan > > Based on the discussion the PR for SPARK-13843 > ([here|https://github.com/apache/spark/pull/11672#issuecomment-196553283]), > we should consider moving the Kafka modules out of Spark as well. > Providing newer functionality (like security) has become painful while > maintaining compatibility with older versions of Kafka. Moving this out > allows more flexibility, allowing users to mix and match Kafka and Spark > versions. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-9197) Cached RDD partitions are lost when executors are dynamically deallocated
[ https://issues.apache.org/jira/browse/SPARK-9197?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14634748#comment-14634748 ] Kostas Sakellis commented on SPARK-9197: I think this is a duplicate of: https://issues.apache.org/jira/browse/SPARK-7955 Cached RDD partitions are lost when executors are dynamically deallocated - Key: SPARK-9197 URL: https://issues.apache.org/jira/browse/SPARK-9197 Project: Spark Issue Type: Bug Components: YARN Affects Versions: 1.4.1 Reporter: Ryan Williams Priority: Minor Currently, dynamic allocation cleans up executors that have not run any tasks for a certain amount of time. However, this often leads to cached RDD partitions being lost. Should dynamic allocation leave executors alone that have cached partitions? Should this be configurable? Is there any interest in code that would shuffle cached partitions around in preparation for executor-deallocation, to avoid this? Such logic could be useful in general for maintaining persisted RDDs across executor churn. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-7161) Provide REST api to download event logs from History Server
[ https://issues.apache.org/jira/browse/SPARK-7161?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kostas Sakellis updated SPARK-7161: --- Component/s: (was: Streaming) Spark Core Provide REST api to download event logs from History Server --- Key: SPARK-7161 URL: https://issues.apache.org/jira/browse/SPARK-7161 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 1.3.1 Reporter: Hari Shreedharan Priority: Minor The idea is to tar up the logs and return the tar.gz file using a REST api. This can be used for debugging even after the app is done. I am planning to take a look at this. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6506) python support yarn cluster mode requires SPARK_HOME to be set
[ https://issues.apache.org/jira/browse/SPARK-6506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14484250#comment-14484250 ] Kostas Sakellis commented on SPARK-6506: Here is the exception I saw when I ran the above job: {code} Traceback (most recent call last): File pi.py, line 29, in module sc = SparkContext(appName=PythonPi) File /opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.23/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/pyspark/context.py, line 108, in __init__ File /opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.23/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/pyspark/context.py, line 222, in _ensure_initialized File /opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.23/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/pyspark/java_gateway.py, line 32, in launch_gateway File /usr/lib64/python2.6/UserDict.py, line 22, in __getitem__ raise KeyError(key) KeyError: 'SPARK_HOME' {code} python support yarn cluster mode requires SPARK_HOME to be set -- Key: SPARK-6506 URL: https://issues.apache.org/jira/browse/SPARK-6506 Project: Spark Issue Type: Bug Components: YARN Affects Versions: 1.3.0 Reporter: Thomas Graves We added support for python running in yarn cluster mode in https://issues.apache.org/jira/browse/SPARK-5173, but it requires that SPARK_HOME be set in the environment variables for application master and executor. It doesn't have to be set to anything real but it fails if its not set. See the command at the end of: https://github.com/apache/spark/pull/3976 -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6506) python support yarn cluster mode requires SPARK_HOME to be set
[ https://issues.apache.org/jira/browse/SPARK-6506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14482414#comment-14482414 ] Kostas Sakellis commented on SPARK-6506: I ran into this issue too by running: bq. spark-submit --master yarn-cluster examples/pi.py 4 it looks like I only had to set: spark.yarn.appMasterEnv.SPARK_HOME=/bogus to get it going: bq. spark-submit --conf spark.yarn.appMasterEnv.SPARK_HOME=/bogus --master yarn-cluster pi.py 4 python support yarn cluster mode requires SPARK_HOME to be set -- Key: SPARK-6506 URL: https://issues.apache.org/jira/browse/SPARK-6506 Project: Spark Issue Type: Bug Components: YARN Affects Versions: 1.3.0 Reporter: Thomas Graves We added support for python running in yarn cluster mode in https://issues.apache.org/jira/browse/SPARK-5173, but it requires that SPARK_HOME be set in the environment variables for application master and executor. It doesn't have to be set to anything real but it fails if its not set. See the command at the end of: https://github.com/apache/spark/pull/3976 -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-1239) Don't fetch all map output statuses at each reducer during shuffles
[ https://issues.apache.org/jira/browse/SPARK-1239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14352306#comment-14352306 ] Kostas Sakellis commented on SPARK-1239: How many reduce side tasks do you have? Can you please attach your your logs that show the OOM errors/ Don't fetch all map output statuses at each reducer during shuffles --- Key: SPARK-1239 URL: https://issues.apache.org/jira/browse/SPARK-1239 Project: Spark Issue Type: Improvement Components: Shuffle, Spark Core Affects Versions: 1.0.2, 1.1.0 Reporter: Patrick Wendell Instead we should modify the way we fetch map output statuses to take both a mapper and a reducer - or we should just piggyback the statuses on each task. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-5721) Propagate missing external shuffle service errors to client
Kostas Sakellis created SPARK-5721: -- Summary: Propagate missing external shuffle service errors to client Key: SPARK-5721 URL: https://issues.apache.org/jira/browse/SPARK-5721 Project: Spark Issue Type: Bug Components: Spark Core, YARN Reporter: Kostas Sakellis When spark.shuffle.service.enabled=true, the yarn AM expects to find an aux service running in the namenode. If it cannot find one an exception like this is present in the app master logs. {noformat} Exception in thread ContainerLauncher #0 Exception in thread ContainerLauncher #1 java.lang.Error: org.apache.hadoop.yarn.exceptions.InvalidAuxServiceException: The auxService:spark_shuffle does not exist at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1151) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.hadoop.yarn.exceptions.InvalidAuxServiceException: The auxService:spark_shuffle does not exist at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.instantiateException(SerializedExceptionPBImpl.java:168) at org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.deSerialize(SerializedExceptionPBImpl.java:106) at org.apache.hadoop.yarn.client.api.impl.NMClientImpl.startContainer(NMClientImpl.java:206) at org.apache.spark.deploy.yarn.ExecutorRunnable.startContainer(ExecutorRunnable.scala:110) at org.apache.spark.deploy.yarn.ExecutorRunnable.run(ExecutorRunnable.scala:65) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) ... 2 more java.lang.Error: org.apache.hadoop.yarn.exceptions.InvalidAuxServiceException: The auxService:spark_shuffle does not exist at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1151) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.hadoop.yarn.exceptions.InvalidAuxServiceException: The auxService:spark_shuffle does not exist at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.instantiateException(SerializedExceptionPBImpl.java:168) at org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.deSerialize(SerializedExceptionPBImpl.java:106) at org.apache.hadoop.yarn.client.api.impl.NMClientImpl.startContainer(NMClientImpl.java:206) at org.apache.spark.deploy.yarn.ExecutorRunnable.startContainer(ExecutorRunnable.scala:110) at org.apache.spark.deploy.yarn.ExecutorRunnable.run(ExecutorRunnable.scala:65) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) ... 2 more {noformat} We should propagate this error to the driver (in yarn-client mode) because it is otherwise unclear why the number of executors expected are not starting up. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-5679) Flaky tests in InputOutputMetricsSuite: input metrics with interleaved reads and input metrics with mixed read method
[ https://issues.apache.org/jira/browse/SPARK-5679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14312600#comment-14312600 ] Kostas Sakellis commented on SPARK-5679: I have tried to repo this in a number of different ways and failed: 1. On mac with sbt and without sbt. 2. On centos 6x using sbt and without sbt 3. On ubuntu using sbt and without sbt Yet it is reproducible on the build machines but only for hadoop 2.2. As [~joshrosen] pointed out, it might be some shared state that is specific to older versions of hadoop. How do we feel about changing this test suite so that it doesn't use a shared spark context. I know it will slow down the test a bit but might be the easiest way. Flaky tests in InputOutputMetricsSuite: input metrics with interleaved reads and input metrics with mixed read method -- Key: SPARK-5679 URL: https://issues.apache.org/jira/browse/SPARK-5679 Project: Spark Issue Type: Bug Components: Spark Core, Tests Affects Versions: 1.3.0 Reporter: Patrick Wendell Assignee: Kostas Sakellis Priority: Blocker Please audit these and see if there are any assumptions with respect to File IO that might not hold in all cases. I'm happy to help if you can't find anything. These both failed in the same run: https://amplab.cs.berkeley.edu/jenkins/view/Spark/job/Spark-1.3-SBT/38/AMPLAB_JENKINS_BUILD_PROFILE=hadoop2.2,label=centos/#showFailuresLink {code} org.apache.spark.metrics.InputOutputMetricsSuite.input metrics with mixed read method Failing for the past 13 builds (Since Failed#26 ) Took 48 sec. Error Message 2030 did not equal 6496 Stacktrace sbt.ForkMain$ForkError: 2030 did not equal 6496 at org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:500) at org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1555) at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:466) at org.apache.spark.metrics.InputOutputMetricsSuite$$anonfun$9.apply$mcV$sp(InputOutputMetricsSuite.scala:135) at org.apache.spark.metrics.InputOutputMetricsSuite$$anonfun$9.apply(InputOutputMetricsSuite.scala:113) at org.apache.spark.metrics.InputOutputMetricsSuite$$anonfun$9.apply(InputOutputMetricsSuite.scala:113) at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22) at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85) at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) at org.scalatest.Transformer.apply(Transformer.scala:22) at org.scalatest.Transformer.apply(Transformer.scala:20) at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:166) at org.scalatest.Suite$class.withFixture(Suite.scala:1122) at org.scalatest.FunSuite.withFixture(FunSuite.scala:1555) at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:163) at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175) at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175) at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306) at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:175) at org.apache.spark.metrics.InputOutputMetricsSuite.org$scalatest$BeforeAndAfter$$super$runTest(InputOutputMetricsSuite.scala:46) at org.scalatest.BeforeAndAfter$class.runTest(BeforeAndAfter.scala:200) at org.apache.spark.metrics.InputOutputMetricsSuite.runTest(InputOutputMetricsSuite.scala:46) at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208) at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208) at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:413) at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401) at scala.collection.immutable.List.foreach(List.scala:318) at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401) at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396) at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:483) at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:208) at org.scalatest.FunSuite.runTests(FunSuite.scala:1555) at org.scalatest.Suite$class.run(Suite.scala:1424) at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1555) at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212) at
[jira] [Commented] (SPARK-5647) Output metrics do not show up for older hadoop versions ( 2.5)
[ https://issues.apache.org/jira/browse/SPARK-5647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14312818#comment-14312818 ] Kostas Sakellis commented on SPARK-5647: I'm not sure if this is possible with older hadoop. Need to do some investigation. We could possibly check the final file size after it has been written? [~sandyr] had some ideas when I talked to him about it. Output metrics do not show up for older hadoop versions ( 2.5) --- Key: SPARK-5647 URL: https://issues.apache.org/jira/browse/SPARK-5647 Project: Spark Issue Type: New Feature Components: Spark Core Reporter: Kostas Sakellis Need to add output metrics for hadoop 2.5. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-5674) Spark Job Explain Plan Proof of Concept
Kostas Sakellis created SPARK-5674: -- Summary: Spark Job Explain Plan Proof of Concept Key: SPARK-5674 URL: https://issues.apache.org/jira/browse/SPARK-5674 Project: Spark Issue Type: Improvement Components: Spark Core Reporter: Kostas Sakellis This is just a prototype of creating an explain plan for a job. Code can be found here: https://github.com/ksakellis/spark/tree/kostas-explainPlan-poc The code was written very quickly and so doesn't have any comments, tests and is probably buggy - hence it being a proof of concept. *How to Use* # {code}sc.explainOn = sc.explainOff{code} This will generate the explain plain and print it in the logs # {code}sc.enableExecution = sc.disableExecution{code} This will disable executing of the job. Using these two knobs a user can choose to print the explain plan and/or disable the running of the job if they only want to see the plan. *Implementation* This is only a prototype and it is by no means production ready. The code is pretty hacky in places and a few shortcuts were made just to get the prototype working. The most interesting part of this commit is in the ExecutionPlanner.scala class. This class creates its own private instance of the DAGScheduler and passes into it a NoopTaskScheduler. The NoopTaskScheduler receives the created TaskSets from the DAGScheduler and records the stages - tasksets. The NoopTaskScheduler also creates fake CompletionsEvents and sends them to the DAGScheduler to move the scheduling along. It is done this way so that we can use the DAGScheduler unmodified thus reducing code divergence. The rest of the code is about processing the information produced by the ExecutionPlanner, creating a DAG with a bunch of metadata and printing it as a pretty ascii drawing. For drawing the DAG, https://github.com/mdr/ascii-graphs is used. This was just easier again to prototype. *How is this different than RDD#toDebugString?* The execution planner runs the job through the entire DAGScheduler so we can collect some metrics that are not presently available in the debugString. For example, we can report the binary size of the task which might be important if the closures are referencing large object. In addition, because we execute the scheduler code from an action, we can get a more accurate picture of where the stage boundaries and dependencies. An action such ask treeReduce will generate a number of stages that you can't get just by doing .toDebugString on the rdd. *Limitations of this Implementation* Because this is a prototype there are is a lot of lame stuff in this commit. # All of the code in SparkContext in particular sucks. This adds some code in the runJob() call and when it gets the plan it just writes it to the INFO log. We need to find a better way of exposing the plan to the caller so that they can print it, analyze it etc. Maybe we can use implicits or something? Not sure how best to do this yet. # Some of the actions will return through exceptions because we are basically faking a runJob(). If you want ot try this, it is best to just use count() instead of say collect(). This will get fixed when we fix 1) # Because the ExplainPlanner creates its own DAGScheduler, there currently is no way to map the real stages to the explainPlan stages. So if a user turns on explain plan, and doesn't disable execution, we can't automatically add more metrics to the explain plan as they become available. The stageId in the plan and the stageId in the real scheduler will be different. This is important for when we add it to the webUI and users can track progress on the DAG # We are using https://github.com/mdr/ascii-graphs to draw the DAG - not sure if we want to depend on that project. *Next Steps* # It would be good to get a few people to take a look at the code specifically at how the plan gets generated. Clone the package and give it a try with some of your jobs # If the approach looks okay overall, I can put together a mini design doc and add some answers to the above limitations of this approach. #Feedback most welcome. *Example Code:* {code} sc.explainOn sc.disableExecution val rdd = sc.parallelize(1 to 10, 4).map(key = (key.toString, key)) val rdd2 = sc.parallelize(1 to 5, 2).map(key = (key.toString, key)) rdd.join(rdd2) .count() {code} *Example Output:* {noformat} EXPLAIN PLAN: +---+ +---+ | | | | |Stage: 0 @ map | |Stage: 1 @ map | | Tasks: 4| | Tasks: 2| | | | | +---+ +---+ | | v v +-+ | | |Stage: 2 @ count | |Tasks: 4 | | | +-+ STAGE DETAILS: -- Stage: 0
[jira] [Created] (SPARK-5647) Output metrics do not show up for older hadoop versions ( 2.5)
Kostas Sakellis created SPARK-5647: -- Summary: Output metrics do not show up for older hadoop versions ( 2.5) Key: SPARK-5647 URL: https://issues.apache.org/jira/browse/SPARK-5647 Project: Spark Issue Type: New Feature Components: Spark Core Reporter: Kostas Sakellis Need to add output metrics for hadoop 2.5. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-5645) Track local bytes read for shuffles - update UI
Kostas Sakellis created SPARK-5645: -- Summary: Track local bytes read for shuffles - update UI Key: SPARK-5645 URL: https://issues.apache.org/jira/browse/SPARK-5645 Project: Spark Issue Type: New Feature Components: Spark Core Reporter: Kostas Sakellis Currently we do not track the local bytes read for a shuffle read. The UI only shows the remote bytes read. This is pretty confusing to the user because: 1) In local mode all shuffle reads are local 2) the shuffle bytes written from the previous stage might not add up if there are some bytes that are read locally on the shuffle read side 3) With https://github.com/apache/spark/pull/4067 we display the total number of records so that won't line up with only showing the remote bytes read. I propose we track the remote and local bytes read separately. In the UI show the total bytes read and in brackets show the remote bytes read for a shuffle. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-5646) Record output metrics for cache
Kostas Sakellis created SPARK-5646: -- Summary: Record output metrics for cache Key: SPARK-5646 URL: https://issues.apache.org/jira/browse/SPARK-5646 Project: Spark Issue Type: New Feature Components: Spark Core Reporter: Kostas Sakellis We currently show the input metrics when coming from the cache but we don't track/show the output metrics when we write to the cache -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-5081) Shuffle write increases
[ https://issues.apache.org/jira/browse/SPARK-5081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14307584#comment-14307584 ] Kostas Sakellis commented on SPARK-5081: Can you add a sample of the code too? Shuffle write increases --- Key: SPARK-5081 URL: https://issues.apache.org/jira/browse/SPARK-5081 Project: Spark Issue Type: Bug Components: Shuffle Affects Versions: 1.2.0 Reporter: Kevin Jung The size of shuffle write showing in spark web UI is much different when I execute same spark job with same input data in both spark 1.1 and spark 1.2. At sortBy stage, the size of shuffle write is 98.1MB in spark 1.1 but 146.9MB in spark 1.2. I set spark.shuffle.manager option to hash because it's default value is changed but spark 1.2 still writes shuffle output more than spark 1.1. It can increase disk I/O overhead exponentially as the input file gets bigger and it causes the jobs take more time to complete. In the case of about 100GB input, for example, the size of shuffle write is 39.7GB in spark 1.1 but 91.0GB in spark 1.2. spark 1.1 ||Stage Id||Description||Input||Shuffle Read||Shuffle Write|| |9|saveAsTextFile| |1169.4KB| | |12|combineByKey| |1265.4KB|1275.0KB| |6|sortByKey| |1276.5KB| | |8|mapPartitions| |91.0MB|1383.1KB| |4|apply| |89.4MB| | |5|sortBy|155.6MB| |98.1MB| |3|sortBy|155.6MB| | | |1|collect| |2.1MB| | |2|mapValues|155.6MB| |2.2MB| |0|first|184.4KB| | | spark 1.2 ||Stage Id||Description||Input||Shuffle Read||Shuffle Write|| |12|saveAsTextFile| |1170.2KB| | |11|combineByKey| |1264.5KB|1275.0KB| |8|sortByKey| |1273.6KB| | |7|mapPartitions| |134.5MB|1383.1KB| |5|zipWithIndex| |132.5MB| | |4|sortBy|155.6MB| |146.9MB| |3|sortBy|155.6MB| | | |2|collect| |2.0MB| | |1|mapValues|155.6MB| |2.2MB| |0|first|184.4KB| | | -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-5557) Servlet API classes now missing after jetty shading
[ https://issues.apache.org/jira/browse/SPARK-5557?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14308374#comment-14308374 ] Kostas Sakellis commented on SPARK-5557: [~pwendell] recommended this which did the trick: {code} diff --git a/core/pom.xml b/core/pom.xml index 2dc5f74..f03ec47 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -132,6 +132,11 @@ artifactIdjetty-servlet/artifactId scopecompile/scope /dependency +dependency + groupIdorg.eclipse.jetty.orbit/groupId + artifactIdjavax.servlet/artifactId + version3.0.0.v201112011016/version +/dependency dependency groupIdorg.apache.commons/groupId {code} Servlet API classes now missing after jetty shading --- Key: SPARK-5557 URL: https://issues.apache.org/jira/browse/SPARK-5557 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.3.0 Reporter: Guoqiang Li Priority: Blocker the log: {noformat} 5/02/03 19:06:39 INFO spark.HttpServer: Starting HTTP Server Exception in thread main java.lang.NoClassDefFoundError: javax/servlet/http/HttpServletResponse at org.apache.spark.HttpServer.org$apache$spark$HttpServer$$doStart(HttpServer.scala:75) at org.apache.spark.HttpServer$$anonfun$1.apply(HttpServer.scala:62) at org.apache.spark.HttpServer$$anonfun$1.apply(HttpServer.scala:62) at org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1774) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1765) at org.apache.spark.HttpServer.start(HttpServer.scala:62) at org.apache.spark.repl.SparkIMain.init(SparkIMain.scala:130) at org.apache.spark.repl.SparkILoop$SparkILoopInterpreter.init(SparkILoop.scala:185) at org.apache.spark.repl.SparkILoop.createInterpreter(SparkILoop.scala:214) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:946) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:942) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:942) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:942) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1039) at org.apache.spark.repl.Main$.main(Main.scala:31) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:403) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:77) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.ClassNotFoundException: javax.servlet.http.HttpServletResponse at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 25 more {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-4630) Dynamically determine optimal number of partitions
[ https://issues.apache.org/jira/browse/SPARK-4630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14284341#comment-14284341 ] Kostas Sakellis commented on SPARK-4630: I agree that this should be build not assuming SchemaRDD. Like Dryad and Tez (which is basically Dryad) we should be able to use runtime statistics (as opposed to metastore stats) to compute the optimal partition numbers. I'm no Dryad expert but simply read through their papers: 1) Dryad: http://research.microsoft.com/en-us/projects/dryad/eurosys07.pdf 2) DryadLinq: http://research.microsoft.com/en-us/projects/DryadLINQ/DryadLINQ.pdf 3) Optimus: http://research.microsoft.com/pubs/185714/Optimus.pdf The DryadLinq paper has a small section on dynamic partitioning. Section 4.2.2: {quote} Dynamic data partitioning sets the number of ver- tices in each stage (i.e., the number of partitions of each dataset) at run time based on the size of its input data. Traditional databases usually estimate dataset sizes statically, but these estimates can be very inaccurate, for ex- ample in the presence of correlated queries. DryadLINQ supports dynamic hash and range partitions—for range partitions both the number of partitions and the partition- ing key ranges are determined at run time by sampling the input dataset. {quote} The Optimus paper talks about more optimizations they did in their system that runs on top of Dryad. There are a lot of optimizations but dynamic partitioning is talked about ins Section 3.1. They describe creating a set of sampled histograms, one for each dependent partition, and then depending on the operation choose a combining strategy for the statistics. For example, for joins they do the product of the histograms. Using the stats from the histograms they determine how many vertices (partitions) to add to the graph processing. The paper they reference for creating the sampling histogram is http://www.mathcs.emory.edu/~cheung/papers/StreamDB/Histogram/1998-Chaudhuri-Histo.pdf - I haven't read it yet. They don't really get into how they bootstrap this - sampling the original datasources stored in the filesystem. From what I can tell, [~lianhuiwang]'s patch assumes that all records are the same size since it solely looks at the map status and hadoop input sizes. I don't think this is good enough to make intelligent decisions as you also need to look at the record sizes to be able to prevent skew. The partial DAG execution described in the Shark paper is similar to what Dryad does. [~rxin], why was this not pushed down to core Spark? Partial DAG execution could allow us to have a number of runtime optimizations that are currently not possible. Dynamically determine optimal number of partitions -- Key: SPARK-4630 URL: https://issues.apache.org/jira/browse/SPARK-4630 Project: Spark Issue Type: Improvement Components: Spark Core Reporter: Kostas Sakellis Assignee: Kostas Sakellis Partition sizes play a big part in how fast stages execute during a Spark job. There is a direct relationship between the size of partitions to the number of tasks - larger partitions, fewer tasks. For better performance, Spark has a sweet spot for how large partitions should be that get executed by a task. If partitions are too small, then the user pays a disproportionate cost in scheduling overhead. If the partitions are too large, then task execution slows down due to gc pressure and spilling to disk. To increase performance of jobs, users often hand optimize the number(size) of partitions that the next stage gets. Factors that come into play are: Incoming partition sizes from previous stage number of available executors available memory per executor (taking into account spark.shuffle.memoryFraction) Spark has access to this data and so should be able to automatically do the partition sizing for the user. This feature can be turned off/on with a configuration option. To make this happen, we propose modifying the DAGScheduler to take into account partition sizes upon stage completion. Before scheduling the next stage, the scheduler can examine the sizes of the partitions and determine the appropriate number tasks to create. Since this change requires non-trivial modifications to the DAGScheduler, a detailed design doc will be attached before proceeding with the work. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-5225) Support coalesed Input Metrics from different sources
Kostas Sakellis created SPARK-5225: -- Summary: Support coalesed Input Metrics from different sources Key: SPARK-5225 URL: https://issues.apache.org/jira/browse/SPARK-5225 Project: Spark Issue Type: Bug Reporter: Kostas Sakellis Currently, If task reads data from more than one block and it is from different read methods we ignore the second read method bytes. For example: CoalescedRDD | Task1 / |\ / | \ hadoop hadoop cached if Task1 starts reading from the hadoop blocks first, then the input metrics for Task1 will only contain input metrics from the hadoop blocks and ignre the input metrics from cached blocks. We need to change the way we collect input metrics so that it is not a single value but rather a collection of input metrics for a task. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-5225) Support coalesed Input Metrics from different sources
[ https://issues.apache.org/jira/browse/SPARK-5225?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kostas Sakellis updated SPARK-5225: --- Description: Currently, If task reads data from more than one block and it is from different read methods we ignore the second read method bytes. For example: {noformat} CoalescedRDD | Task1 / | \ hadoop hadoop cached {noformat} if Task1 starts reading from the hadoop blocks first, then the input metrics for Task1 will only contain input metrics from the hadoop blocks and ignre the input metrics from cached blocks. We need to change the way we collect input metrics so that it is not a single value but rather a collection of input metrics for a task. was: Currently, If task reads data from more than one block and it is from different read methods we ignore the second read method bytes. For example: CoalescedRDD | Task1 / |\ / | \ hadoop hadoop cached if Task1 starts reading from the hadoop blocks first, then the input metrics for Task1 will only contain input metrics from the hadoop blocks and ignre the input metrics from cached blocks. We need to change the way we collect input metrics so that it is not a single value but rather a collection of input metrics for a task. Support coalesed Input Metrics from different sources - Key: SPARK-5225 URL: https://issues.apache.org/jira/browse/SPARK-5225 Project: Spark Issue Type: Bug Reporter: Kostas Sakellis Currently, If task reads data from more than one block and it is from different read methods we ignore the second read method bytes. For example: {noformat} CoalescedRDD | Task1 / | \ hadoop hadoop cached {noformat} if Task1 starts reading from the hadoop blocks first, then the input metrics for Task1 will only contain input metrics from the hadoop blocks and ignre the input metrics from cached blocks. We need to change the way we collect input metrics so that it is not a single value but rather a collection of input metrics for a task. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-5190) Allow spark listeners to be added before spark context gets initialized.
Kostas Sakellis created SPARK-5190: -- Summary: Allow spark listeners to be added before spark context gets initialized. Key: SPARK-5190 URL: https://issues.apache.org/jira/browse/SPARK-5190 Project: Spark Issue Type: Improvement Components: Spark Core Reporter: Kostas Sakellis Currently, you need the spark context to add spark listener events. But, if you wait until the spark context gets created before adding your listener you might miss events like blockManagerAdded or executorAdded. We should fix this so you can attach a listener to the spark context before it starts any initialization. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-4874) Report number of records read/written in a task
Kostas Sakellis created SPARK-4874: -- Summary: Report number of records read/written in a task Key: SPARK-4874 URL: https://issues.apache.org/jira/browse/SPARK-4874 Project: Spark Issue Type: Improvement Reporter: Kostas Sakellis This metric will help us find key skew using the WebUI -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-4874) Report number of records read/written in a task
[ https://issues.apache.org/jira/browse/SPARK-4874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14250304#comment-14250304 ] Kostas Sakellis commented on SPARK-4874: I'm working on this. Report number of records read/written in a task --- Key: SPARK-4874 URL: https://issues.apache.org/jira/browse/SPARK-4874 Project: Spark Issue Type: Improvement Reporter: Kostas Sakellis This metric will help us find key skew using the WebUI -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-4857) Add Executor Events to SparkListener
[ https://issues.apache.org/jira/browse/SPARK-4857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14247578#comment-14247578 ] Kostas Sakellis commented on SPARK-4857: I'll work on this. Add Executor Events to SparkListener Key: SPARK-4857 URL: https://issues.apache.org/jira/browse/SPARK-4857 Project: Spark Issue Type: Improvement Reporter: Kostas Sakellis We need to add events to the SparkListener to indicate an executor has been added or removed with corresponding information. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-4857) Add Executor Events to SparkListener
Kostas Sakellis created SPARK-4857: -- Summary: Add Executor Events to SparkListener Key: SPARK-4857 URL: https://issues.apache.org/jira/browse/SPARK-4857 Project: Spark Issue Type: Improvement Reporter: Kostas Sakellis We need to add events to the SparkListener to indicate an executor has been added or removed with corresponding information. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-4843) Squash ExecutorRunnable and ExecutorRunnableUtil hierarchy in yarn module
Kostas Sakellis created SPARK-4843: -- Summary: Squash ExecutorRunnable and ExecutorRunnableUtil hierarchy in yarn module Key: SPARK-4843 URL: https://issues.apache.org/jira/browse/SPARK-4843 Project: Spark Issue Type: Improvement Reporter: Kostas Sakellis ExecutorRunnableUtil is a parent of ExecutorRunnable because of the yarn-alpha and yarn-stable split. Now that yarn-alpha is gone, we can squash the unnecessary hierarchy. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-4774) Make HiveFromSpark example more portable
Kostas Sakellis created SPARK-4774: -- Summary: Make HiveFromSpark example more portable Key: SPARK-4774 URL: https://issues.apache.org/jira/browse/SPARK-4774 Project: Spark Issue Type: Bug Components: Examples Reporter: Kostas Sakellis The HiveFromSpark example needs the kv1.txt file to run in a specific location: SPARK_HOME/examples/src/main/resources/kv1.txt which assumes you have the source tree checked out. A more portable way is to copy the kv1.txt to a temporary file that gets cleaned up when the jvm shutsdown. This would allow us to run this example outside of a source tree. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-4774) Make HiveFromSpark example more portable
[ https://issues.apache.org/jira/browse/SPARK-4774?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kostas Sakellis updated SPARK-4774: --- Component/s: SQL Make HiveFromSpark example more portable Key: SPARK-4774 URL: https://issues.apache.org/jira/browse/SPARK-4774 Project: Spark Issue Type: Bug Components: Examples, SQL Reporter: Kostas Sakellis The HiveFromSpark example needs the kv1.txt file to run in a specific location: SPARK_HOME/examples/src/main/resources/kv1.txt which assumes you have the source tree checked out. A more portable way is to copy the kv1.txt to a temporary file that gets cleaned up when the jvm shutsdown. This would allow us to run this example outside of a source tree. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-4630) Dynamically determine optimal number of partitions
Kostas Sakellis created SPARK-4630: -- Summary: Dynamically determine optimal number of partitions Key: SPARK-4630 URL: https://issues.apache.org/jira/browse/SPARK-4630 Project: Spark Issue Type: Improvement Components: Spark Core Reporter: Kostas Sakellis Partition sizes play a big part in how fast stages execute during a Spark job. There is a direct relationship between the size of partitions to the number of tasks - larger partitions, fewer tasks. For better performance, Spark has a sweet spot for how large partitions should be that get executed by a task. If partitions are too small, then the user pays a disproportionate cost in scheduling overhead. If the partitions are too large, then task execution slows down due to gc pressure and spilling to disk. To increase performance of jobs, users often hand optimize the number(size) of partitions that the next stage gets. Factors that come into play are: Incoming partition sizes from previous stage number of available executors available memory per executor (taking into account spark.shuffle.memoryFraction) Spark has access to this data and so should be able to automatically do the partition sizing for the user. This feature can be turned off/on with a configuration option. To make this happen, we propose modifying the DAGScheduler to take into account partition sizes upon stage completion. Before scheduling the next stage, the scheduler can examine the sizes of the partitions and determine the appropriate number tasks to create. Since this change requires non-trivial modifications to the DAGScheduler, a detailed design doc will be attached before proceeding with the work. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-2450) Provide link to YARN executor logs on UI
[ https://issues.apache.org/jira/browse/SPARK-2450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14209350#comment-14209350 ] Kostas Sakellis commented on SPARK-2450: I'll pick up the work on this. Provide link to YARN executor logs on UI Key: SPARK-2450 URL: https://issues.apache.org/jira/browse/SPARK-2450 Project: Spark Issue Type: Improvement Components: Web UI, YARN Affects Versions: 1.0.0 Reporter: Bill Havanki Priority: Minor When running under YARN, provide links to executor logs from the web UI to avoid the need to drill down through the YARN UI. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-4092) Input metrics don't work for coalesce()'d RDD's
[ https://issues.apache.org/jira/browse/SPARK-4092?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14207190#comment-14207190 ] Kostas Sakellis commented on SPARK-4092: [~aash], yes this should solve a superset of the same problems that SPARK-2630 aims to fix. I say superset because https://github.com/apache/spark/pull/3120 also includes a similar fix when hadoop 2.5 is used with the bytesReadCallback. Input metrics don't work for coalesce()'d RDD's --- Key: SPARK-4092 URL: https://issues.apache.org/jira/browse/SPARK-4092 Project: Spark Issue Type: Bug Components: Spark Core Reporter: Patrick Wendell Assignee: Kostas Sakellis Priority: Critical In every case where we set input metrics (from both Hadoop and block storage) we currently assume that exactly one input partition is computed within the task. This is not a correct assumption in the general case. The main example in the current API is coalesce(), but user-defined RDD's could also be affected. To deal with the most general case, we would need to support the notion of a single task having multiple input sources. A more surgical and less general fix is to simply go to HadoopRDD and check if there are already inputMetrics defined for the task with the same type. If there are, then merge in the new data rather than blowing away the old one. This wouldn't cover case where, e.g. a single task has input from both on-disk and in-memory blocks. It _would_ cover the case where someone calls coalesce on a HadoopRDD... which is more common. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-4079) Snappy bundled with Spark does not work on older Linux distributions
[ https://issues.apache.org/jira/browse/SPARK-4079?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14192188#comment-14192188 ] Kostas Sakellis commented on SPARK-4079: yes, I'm taking this over from Marcelo. Snappy bundled with Spark does not work on older Linux distributions Key: SPARK-4079 URL: https://issues.apache.org/jira/browse/SPARK-4079 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.0.0 Reporter: Marcelo Vanzin This issue has existed at least since 1.0, but has been made worse by 1.1 since snappy is now the default compression algorithm. When trying to use it on a CentOS 5 machine, for example, you'll get something like this: {noformat} java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.xerial.snappy.SnappyLoader.loadNativeLibrary(SnappyLoader.java:319) at org.xerial.snappy.SnappyLoader.load(SnappyLoader.java:226) at org.xerial.snappy.Snappy.clinit(Snappy.java:48) at org.xerial.snappy.SnappyOutputStream.init(SnappyOutputStream.java:79) at org.apache.spark.io.SnappyCompressionCodec.compressedOutputStream(CompressionCodec.scala:125) at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:207) ... Caused by: java.lang.UnsatisfiedLinkError: /tmp/snappy-1.0.5.3-af72bf3c-9dab-43af-a662-f9af657f06b1-libsnappyjava.so: /usr/lib64/libstdc++.so.6: version `GLIBCXX_3.4.9' not found (required by /tmp/snappy-1.0.5.3-af72bf3c-9dab-43af-a662-f9af657f06b1-libsnappyjava.so) at java.lang.ClassLoader$NativeLibrary.load(Native Method) at java.lang.ClassLoader.loadLibrary1(ClassLoader.java:1957) at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1882) at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1843) at java.lang.Runtime.load0(Runtime.java:795) at java.lang.System.load(System.java:1061) at org.xerial.snappy.SnappyNativeLoader.load(SnappyNativeLoader.java:39) ... 29 more {noformat} There are two approaches I can see here (well, 3): * Declare CentOS 5 (and similar OSes) not supported, although that would suck for the people who are still on it and already use Spark * Fallback to another compression codec if Snappy cannot be loaded * Ask the Snappy guys to compile the library on an older OS... I think the second would be the best compromise. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-4092) Input metrics don't work for coalesce()'d RDD's
[ https://issues.apache.org/jira/browse/SPARK-4092?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14190974#comment-14190974 ] Kostas Sakellis commented on SPARK-4092: The current patch I'm working on does the simplest thing to address this issue. In the hadoop rdds and cache manager if the task already has input metrics of the same read method it will increment them instead of overriding. This simple solution should handle the common case of: {code} sc.textFile(..).coalesce(5).collect() {code} In addition it will cover the situation where blocks are coming from the cache. What it won't handle properly is if coalesce (or other rdds with similar properties) reads from multiple blocks of mixed read methods (memory vs. hadoop). In that case, one input metric will override the other. We have several options: # We create a MIXED readMethod and if we see input metrics from different methods, we change to MIXED. This will loose some information because now we don't know where the blocks were read from. # We store multiple inputMetrics for each TaskContext. Up the stack (eg. JsonProtocol) we send the array of InputMetrics to the caller. We have to worry about backwards compatibility in that case so we can't just remove the single inputMetric. We might have to send back a MIXED metric and in addition the array for newer clients. # We punt on this issue for now since it can be argued is not common. What are people's thoughts on this? Input metrics don't work for coalesce()'d RDD's --- Key: SPARK-4092 URL: https://issues.apache.org/jira/browse/SPARK-4092 Project: Spark Issue Type: Bug Components: Spark Core Reporter: Patrick Wendell Assignee: Kostas Sakellis Priority: Critical In every case where we set input metrics (from both Hadoop and block storage) we currently assume that exactly one input partition is computed within the task. This is not a correct assumption in the general case. The main example in the current API is coalesce(), but user-defined RDD's could also be affected. To deal with the most general case, we would need to support the notion of a single task having multiple input sources. A more surgical and less general fix is to simply go to HadoopRDD and check if there are already inputMetrics defined for the task with the same type. If there are, then merge in the new data rather than blowing away the old one. This wouldn't cover case where, e.g. a single task has input from both on-disk and in-memory blocks. It _would_ cover the case where someone calls coalesce on a HadoopRDD... which is more common. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-1239) Don't fetch all map output statuses at each reducer during shuffles
[ https://issues.apache.org/jira/browse/SPARK-1239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14182110#comment-14182110 ] Kostas Sakellis commented on SPARK-1239: Apologies for not commenting on this JIRA sooner but since I'm new to Spark, it has taken a bit of time to wrap my head around how the different schedulers (DAG and Task) interoperate. I'm currently investigating completely removing the MapOutputTracker class. Instead, the DAGScheduler can push down the required map status' to the next stage. I'm thinking of modifying the DAGScheduler.submitMissingTasks(..) to take in (or query) the mapStatus' of the previous completed stage. When a new ShuffleMapTask gets created, we pass the filtered mapStatus' necessary for that task. The map status data can be stored in the TaskContext and used in the BlockStoreShuffleFetcher when block data is being read (currently uses the MapOutputTracker). What I'm investigating currently is how to filter the data when I create the ShuffleMapTask - the BlockStoreShuffleFetcher uses a shuffleId which I don't seem to have access to in the DAGScheduler. I haven't yet written any code to test this out so if anyone has any concerns please let me know. Also, [~joshrosen] pointed out that the size of the map output structure even after it has been filtered could still be very large. This is worth investigating. Don't fetch all map output statuses at each reducer during shuffles --- Key: SPARK-1239 URL: https://issues.apache.org/jira/browse/SPARK-1239 Project: Spark Issue Type: Improvement Components: Shuffle, Spark Core Affects Versions: 1.0.2, 1.1.0 Reporter: Patrick Wendell Assignee: Josh Rosen Instead we should modify the way we fetch map output statuses to take both a mapper and a reducer - or we should just piggyback the statuses on each task. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-1239) Don't fetch all map output statuses at each reducer during shuffles
[ https://issues.apache.org/jira/browse/SPARK-1239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14123883#comment-14123883 ] Kostas Sakellis commented on SPARK-1239: [~pwendell] I'd like to take a crack at this since it is affecting one of our customers. Don't fetch all map output statuses at each reducer during shuffles --- Key: SPARK-1239 URL: https://issues.apache.org/jira/browse/SPARK-1239 Project: Spark Issue Type: Improvement Components: Spark Core Reporter: Patrick Wendell Assignee: Andrew Or Fix For: 1.1.0 Instead we should modify the way we fetch map output statuses to take both a mapper and a reducer - or we should just piggyback the statuses on each task. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org