[jira] [Commented] (SPARK-13877) Consider removing Kafka modules from Spark / Spark Streaming

2016-03-20 Thread Kostas Sakellis (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-13877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15200236#comment-15200236
 ] 

Kostas Sakellis commented on SPARK-13877:
-

Why have we not more seriously considered Spark subprojects? I think it makes 
more sense than pull this functionality out of Spark completely. It will give 
these modules their own release trains and have a good separation from core 
spark while under a software governance model that we all understand. 

> Consider removing Kafka modules from Spark / Spark Streaming
> 
>
> Key: SPARK-13877
> URL: https://issues.apache.org/jira/browse/SPARK-13877
> Project: Spark
>  Issue Type: Sub-task
>  Components: Spark Core, Streaming
>Affects Versions: 1.6.1
>Reporter: Hari Shreedharan
>
> Based on the discussion the PR for SPARK-13843 
> ([here|https://github.com/apache/spark/pull/11672#issuecomment-196553283]), 
> we should consider moving the Kafka modules out of Spark as well. 
> Providing newer functionality (like security) has become painful while 
> maintaining compatibility with older versions of Kafka. Moving this out 
> allows more flexibility, allowing users to mix and match Kafka and Spark 
> versions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-13877) Consider removing Kafka modules from Spark / Spark Streaming

2016-03-20 Thread Kostas Sakellis (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-13877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15200252#comment-15200252
 ] 

Kostas Sakellis commented on SPARK-13877:
-

How is this any different than creating a random repo anywhere else? 

> Consider removing Kafka modules from Spark / Spark Streaming
> 
>
> Key: SPARK-13877
> URL: https://issues.apache.org/jira/browse/SPARK-13877
> Project: Spark
>  Issue Type: Sub-task
>  Components: Spark Core, Streaming
>Affects Versions: 1.6.1
>Reporter: Hari Shreedharan
>
> Based on the discussion the PR for SPARK-13843 
> ([here|https://github.com/apache/spark/pull/11672#issuecomment-196553283]), 
> we should consider moving the Kafka modules out of Spark as well. 
> Providing newer functionality (like security) has become painful while 
> maintaining compatibility with older versions of Kafka. Moving this out 
> allows more flexibility, allowing users to mix and match Kafka and Spark 
> versions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-9197) Cached RDD partitions are lost when executors are dynamically deallocated

2015-07-21 Thread Kostas Sakellis (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-9197?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14634748#comment-14634748
 ] 

Kostas Sakellis commented on SPARK-9197:


I think this is a duplicate of: https://issues.apache.org/jira/browse/SPARK-7955

 Cached RDD partitions are lost when executors are dynamically deallocated
 -

 Key: SPARK-9197
 URL: https://issues.apache.org/jira/browse/SPARK-9197
 Project: Spark
  Issue Type: Bug
  Components: YARN
Affects Versions: 1.4.1
Reporter: Ryan Williams
Priority: Minor

 Currently, dynamic allocation cleans up executors that have not run any tasks 
 for a certain amount of time.
 However, this often leads to cached RDD partitions being lost.
 Should dynamic allocation leave executors alone that have cached partitions? 
 Should this be configurable?
 Is there any interest in code that would shuffle cached partitions around in 
 preparation for executor-deallocation, to avoid this? Such logic could be 
 useful in general for maintaining persisted RDDs across executor churn.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-7161) Provide REST api to download event logs from History Server

2015-04-28 Thread Kostas Sakellis (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-7161?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Sakellis updated SPARK-7161:
---
Component/s: (was: Streaming)
 Spark Core

 Provide REST api to download event logs from History Server
 ---

 Key: SPARK-7161
 URL: https://issues.apache.org/jira/browse/SPARK-7161
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 1.3.1
Reporter: Hari Shreedharan
Priority: Minor

 The idea is to tar up the logs and return the tar.gz file using a REST api. 
 This can be used for debugging even after the app is done.
 I am planning to take a look at this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6506) python support yarn cluster mode requires SPARK_HOME to be set

2015-04-07 Thread Kostas Sakellis (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14484250#comment-14484250
 ] 

Kostas Sakellis commented on SPARK-6506:


Here is the exception I saw when I ran the above job:
{code}
Traceback (most recent call last):
  File pi.py, line 29, in module
sc = SparkContext(appName=PythonPi)
  File 
/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.23/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/pyspark/context.py,
 line 108, in __init__
  File 
/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.23/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/pyspark/context.py,
 line 222, in _ensure_initialized
  File 
/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.23/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/pyspark/java_gateway.py,
 line 32, in launch_gateway
  File /usr/lib64/python2.6/UserDict.py, line 22, in __getitem__
raise KeyError(key)
KeyError: 'SPARK_HOME'
{code}

 python support yarn cluster mode requires SPARK_HOME to be set
 --

 Key: SPARK-6506
 URL: https://issues.apache.org/jira/browse/SPARK-6506
 Project: Spark
  Issue Type: Bug
  Components: YARN
Affects Versions: 1.3.0
Reporter: Thomas Graves

 We added support for python running in yarn cluster mode in 
 https://issues.apache.org/jira/browse/SPARK-5173, but it requires that 
 SPARK_HOME be set in the environment variables for application master and 
 executor.  It doesn't have to be set to anything real but it fails if its not 
 set.  See the command at the end of: https://github.com/apache/spark/pull/3976



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6506) python support yarn cluster mode requires SPARK_HOME to be set

2015-04-06 Thread Kostas Sakellis (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14482414#comment-14482414
 ] 

Kostas Sakellis commented on SPARK-6506:


I ran into this issue too by running:
bq. spark-submit  --master yarn-cluster examples/pi.py 4

it looks like I only had to set: spark.yarn.appMasterEnv.SPARK_HOME=/bogus to 
get it going:
bq. spark-submit --conf spark.yarn.appMasterEnv.SPARK_HOME=/bogus --master 
yarn-cluster pi.py 4


 python support yarn cluster mode requires SPARK_HOME to be set
 --

 Key: SPARK-6506
 URL: https://issues.apache.org/jira/browse/SPARK-6506
 Project: Spark
  Issue Type: Bug
  Components: YARN
Affects Versions: 1.3.0
Reporter: Thomas Graves

 We added support for python running in yarn cluster mode in 
 https://issues.apache.org/jira/browse/SPARK-5173, but it requires that 
 SPARK_HOME be set in the environment variables for application master and 
 executor.  It doesn't have to be set to anything real but it fails if its not 
 set.  See the command at the end of: https://github.com/apache/spark/pull/3976



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-1239) Don't fetch all map output statuses at each reducer during shuffles

2015-03-08 Thread Kostas Sakellis (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14352306#comment-14352306
 ] 

Kostas Sakellis commented on SPARK-1239:


How many reduce side tasks do you have? Can you please attach your your logs 
that show the OOM errors/

 Don't fetch all map output statuses at each reducer during shuffles
 ---

 Key: SPARK-1239
 URL: https://issues.apache.org/jira/browse/SPARK-1239
 Project: Spark
  Issue Type: Improvement
  Components: Shuffle, Spark Core
Affects Versions: 1.0.2, 1.1.0
Reporter: Patrick Wendell

 Instead we should modify the way we fetch map output statuses to take both a 
 mapper and a reducer - or we should just piggyback the statuses on each task. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-5721) Propagate missing external shuffle service errors to client

2015-02-10 Thread Kostas Sakellis (JIRA)
Kostas Sakellis created SPARK-5721:
--

 Summary: Propagate missing external shuffle service errors to 
client
 Key: SPARK-5721
 URL: https://issues.apache.org/jira/browse/SPARK-5721
 Project: Spark
  Issue Type: Bug
  Components: Spark Core, YARN
Reporter: Kostas Sakellis


When spark.shuffle.service.enabled=true, the yarn AM expects to find an aux 
service running in the namenode. If it cannot find one an exception like this 
is present in the app master logs. 
{noformat}
Exception in thread ContainerLauncher #0 Exception in thread 
ContainerLauncher #1 java.lang.Error: 
org.apache.hadoop.yarn.exceptions.InvalidAuxServiceException: The 
auxService:spark_shuffle does not exist
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1151)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.hadoop.yarn.exceptions.InvalidAuxServiceException: The 
auxService:spark_shuffle does not exist
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at 
org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.instantiateException(SerializedExceptionPBImpl.java:168)
at 
org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.deSerialize(SerializedExceptionPBImpl.java:106)
at 
org.apache.hadoop.yarn.client.api.impl.NMClientImpl.startContainer(NMClientImpl.java:206)
at 
org.apache.spark.deploy.yarn.ExecutorRunnable.startContainer(ExecutorRunnable.scala:110)
at 
org.apache.spark.deploy.yarn.ExecutorRunnable.run(ExecutorRunnable.scala:65)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
... 2 more
java.lang.Error: org.apache.hadoop.yarn.exceptions.InvalidAuxServiceException: 
The auxService:spark_shuffle does not exist
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1151)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.hadoop.yarn.exceptions.InvalidAuxServiceException: The 
auxService:spark_shuffle does not exist
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at 
org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.instantiateException(SerializedExceptionPBImpl.java:168)
at 
org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.deSerialize(SerializedExceptionPBImpl.java:106)
at 
org.apache.hadoop.yarn.client.api.impl.NMClientImpl.startContainer(NMClientImpl.java:206)
at 
org.apache.spark.deploy.yarn.ExecutorRunnable.startContainer(ExecutorRunnable.scala:110)
at 
org.apache.spark.deploy.yarn.ExecutorRunnable.run(ExecutorRunnable.scala:65)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
... 2 more
{noformat}
We should propagate this error to the driver (in yarn-client mode) because it 
is otherwise unclear why the number of executors expected are not starting up.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-5679) Flaky tests in InputOutputMetricsSuite: input metrics with interleaved reads and input metrics with mixed read method

2015-02-09 Thread Kostas Sakellis (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14312600#comment-14312600
 ] 

Kostas Sakellis commented on SPARK-5679:


I have tried to repo this in a number of different ways and failed:

1. On mac with sbt and without sbt.
2. On centos 6x using sbt and without sbt
3. On ubuntu using sbt and without sbt

Yet it is reproducible on the build machines but only for hadoop 2.2. As 
[~joshrosen] pointed out, it might be some shared state that is specific to 
older versions of hadoop. How do we feel about changing this test suite so that 
it doesn't use a shared spark context. I know it will slow down the test a bit 
but might be the easiest way. 

 Flaky tests in InputOutputMetricsSuite: input metrics with interleaved reads 
 and input metrics with mixed read method 
 --

 Key: SPARK-5679
 URL: https://issues.apache.org/jira/browse/SPARK-5679
 Project: Spark
  Issue Type: Bug
  Components: Spark Core, Tests
Affects Versions: 1.3.0
Reporter: Patrick Wendell
Assignee: Kostas Sakellis
Priority: Blocker

 Please audit these and see if there are any assumptions with respect to File 
 IO that might not hold in all cases. I'm happy to help if you can't find 
 anything.
 These both failed in the same run:
 https://amplab.cs.berkeley.edu/jenkins/view/Spark/job/Spark-1.3-SBT/38/AMPLAB_JENKINS_BUILD_PROFILE=hadoop2.2,label=centos/#showFailuresLink
 {code}
 org.apache.spark.metrics.InputOutputMetricsSuite.input metrics with mixed 
 read method
 Failing for the past 13 builds (Since Failed#26 )
 Took 48 sec.
 Error Message
 2030 did not equal 6496
 Stacktrace
 sbt.ForkMain$ForkError: 2030 did not equal 6496
   at 
 org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:500)
   at 
 org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1555)
   at 
 org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:466)
   at 
 org.apache.spark.metrics.InputOutputMetricsSuite$$anonfun$9.apply$mcV$sp(InputOutputMetricsSuite.scala:135)
   at 
 org.apache.spark.metrics.InputOutputMetricsSuite$$anonfun$9.apply(InputOutputMetricsSuite.scala:113)
   at 
 org.apache.spark.metrics.InputOutputMetricsSuite$$anonfun$9.apply(InputOutputMetricsSuite.scala:113)
   at 
 org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
   at org.scalatest.Transformer.apply(Transformer.scala:22)
   at org.scalatest.Transformer.apply(Transformer.scala:20)
   at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:166)
   at org.scalatest.Suite$class.withFixture(Suite.scala:1122)
   at org.scalatest.FunSuite.withFixture(FunSuite.scala:1555)
   at 
 org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:163)
   at 
 org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
   at 
 org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
   at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
   at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:175)
   at 
 org.apache.spark.metrics.InputOutputMetricsSuite.org$scalatest$BeforeAndAfter$$super$runTest(InputOutputMetricsSuite.scala:46)
   at org.scalatest.BeforeAndAfter$class.runTest(BeforeAndAfter.scala:200)
   at 
 org.apache.spark.metrics.InputOutputMetricsSuite.runTest(InputOutputMetricsSuite.scala:46)
   at 
 org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
   at 
 org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
   at 
 org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:413)
   at 
 org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401)
   at scala.collection.immutable.List.foreach(List.scala:318)
   at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
   at 
 org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396)
   at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:483)
   at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:208)
   at org.scalatest.FunSuite.runTests(FunSuite.scala:1555)
   at org.scalatest.Suite$class.run(Suite.scala:1424)
   at 
 org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1555)
   at 
 org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212)
   at 
 

[jira] [Commented] (SPARK-5647) Output metrics do not show up for older hadoop versions ( 2.5)

2015-02-09 Thread Kostas Sakellis (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14312818#comment-14312818
 ] 

Kostas Sakellis commented on SPARK-5647:


I'm not sure if this is possible with older hadoop. Need to do some 
investigation. We could possibly check the final file size after it has been 
written? [~sandyr] had some ideas when I talked to him about it.

 Output metrics do not show up for older hadoop versions ( 2.5)
 ---

 Key: SPARK-5647
 URL: https://issues.apache.org/jira/browse/SPARK-5647
 Project: Spark
  Issue Type: New Feature
  Components: Spark Core
Reporter: Kostas Sakellis

 Need to add output metrics for hadoop  2.5. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-5674) Spark Job Explain Plan Proof of Concept

2015-02-08 Thread Kostas Sakellis (JIRA)
Kostas Sakellis created SPARK-5674:
--

 Summary: Spark Job Explain Plan Proof of Concept
 Key: SPARK-5674
 URL: https://issues.apache.org/jira/browse/SPARK-5674
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Reporter: Kostas Sakellis


This is just a prototype of creating an explain plan for a job. Code can be 
found here: https://github.com/ksakellis/spark/tree/kostas-explainPlan-poc The 
code was written very quickly and so doesn't have any comments, tests and is 
probably buggy - hence it being a proof of concept.

*How to Use*

# {code}sc.explainOn = sc.explainOff{code} This will generate the explain 
plain and print it in the logs
# {code}sc.enableExecution = sc.disableExecution{code} This will disable 
executing of the job.

Using these two knobs a user can choose to print the explain plan and/or 
disable the running of the job if they only want to see the plan.


*Implementation*

This is only a prototype and it is by no means production ready. The code is 
pretty hacky in places and a few shortcuts were made just to get the prototype 
working.

The most interesting part of this commit is in the ExecutionPlanner.scala 
class. This class creates its own private instance of the DAGScheduler and 
passes into it a NoopTaskScheduler. The NoopTaskScheduler receives the created 
TaskSets from the DAGScheduler and records the stages - tasksets. The 
NoopTaskScheduler also creates fake CompletionsEvents and sends them to the 
DAGScheduler to move the scheduling along. It is done this way so that we can 
use the DAGScheduler unmodified thus reducing code divergence.

The rest of the code is about processing the information produced by the 
ExecutionPlanner, creating a DAG with a bunch of metadata and printing it as a 
pretty ascii drawing. For drawing the DAG, https://github.com/mdr/ascii-graphs 
is used. This was just easier again to prototype.

*How is this different than RDD#toDebugString?*

The execution planner runs the job through the entire DAGScheduler so we can 
collect some metrics that are not presently available in the debugString. For 
example, we can report the binary size of the task which might be important if 
the closures are referencing large object.

In addition, because we execute the scheduler code from an action, we can get a 
more accurate picture of where the stage boundaries and dependencies. An action 
such ask treeReduce will generate a number of stages that you can't get just by 
doing .toDebugString on the rdd.


*Limitations of this Implementation*

Because this is a prototype there are is a lot of lame stuff in this commit.

# All of the code in SparkContext in particular sucks. This adds some code in 
the runJob() call and when it gets the plan it just writes it to the INFO log. 
We need to find a better way of exposing the plan to the caller so that they 
can print it, analyze it etc. Maybe we can use implicits or something? Not sure 
how best to do this yet.
# Some of the actions will return through exceptions because we are basically 
faking a runJob(). If you want ot try this, it is best to just use count() 
instead of say collect(). This will get fixed when we fix 1)
# Because the ExplainPlanner creates its own DAGScheduler, there currently is 
no way to map the real stages to the explainPlan stages. So if a user turns 
on explain plan, and doesn't disable execution, we can't automatically add more 
metrics to the explain plan as they become available. The stageId in the plan 
and the stageId in the real scheduler will be different. This is important for 
when we add it to the webUI and users can track progress on the DAG
# We are using https://github.com/mdr/ascii-graphs to draw the DAG - not sure 
if we want to depend on that project.

*Next Steps*

# It would be good to get a few people to take a look at the code specifically 
at how the plan gets generated. Clone the package and give it a try with some 
of your jobs
# If the approach looks okay overall, I can put together a mini design doc and 
add some answers to the above limitations of this approach. 
#Feedback most welcome.

*Example Code:*

{code}
sc.explainOn
sc.disableExecution

val rdd = sc.parallelize(1 to 10, 4).map(key = (key.toString, key))
val rdd2 = sc.parallelize(1 to 5, 2).map(key = (key.toString, key))
rdd.join(rdd2)
   .count()
{code}


*Example Output:*

{noformat}
EXPLAIN PLAN:

 +---+ +---+
 |   | |   |
 |Stage: 0 @ map | |Stage: 1 @ map |
 |   Tasks: 4| |   Tasks: 2|
 |   | |   |
 +---+ +---+
   | |
   v v
 +-+
 | |
 |Stage: 2 @ count |
 |Tasks: 4 |
 | |
 +-+

 STAGE DETAILS:
 --

 Stage: 0
  

[jira] [Created] (SPARK-5647) Output metrics do not show up for older hadoop versions ( 2.5)

2015-02-05 Thread Kostas Sakellis (JIRA)
Kostas Sakellis created SPARK-5647:
--

 Summary: Output metrics do not show up for older hadoop versions 
( 2.5)
 Key: SPARK-5647
 URL: https://issues.apache.org/jira/browse/SPARK-5647
 Project: Spark
  Issue Type: New Feature
  Components: Spark Core
Reporter: Kostas Sakellis


Need to add output metrics for hadoop  2.5. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-5645) Track local bytes read for shuffles - update UI

2015-02-05 Thread Kostas Sakellis (JIRA)
Kostas Sakellis created SPARK-5645:
--

 Summary: Track local bytes read for shuffles - update UI
 Key: SPARK-5645
 URL: https://issues.apache.org/jira/browse/SPARK-5645
 Project: Spark
  Issue Type: New Feature
  Components: Spark Core
Reporter: Kostas Sakellis


Currently we do not track the local bytes read for a shuffle read. The UI only 
shows the remote bytes read. This is pretty confusing to the user because:
1) In local mode all shuffle reads are local
2) the shuffle bytes written from the previous stage might not add up if there 
are some bytes that are read locally on the shuffle read side
3) With https://github.com/apache/spark/pull/4067 we display the total number 
of records so that won't line up with only showing the remote bytes read. 

I propose we track the remote and local bytes read separately. In the UI show 
the total bytes read and in brackets show the remote bytes read for a shuffle. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-5646) Record output metrics for cache

2015-02-05 Thread Kostas Sakellis (JIRA)
Kostas Sakellis created SPARK-5646:
--

 Summary: Record output metrics for cache
 Key: SPARK-5646
 URL: https://issues.apache.org/jira/browse/SPARK-5646
 Project: Spark
  Issue Type: New Feature
  Components: Spark Core
Reporter: Kostas Sakellis


We currently show the input metrics when coming from the cache but we don't 
track/show the output metrics when we write to the cache



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-5081) Shuffle write increases

2015-02-05 Thread Kostas Sakellis (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14307584#comment-14307584
 ] 

Kostas Sakellis commented on SPARK-5081:


Can you add a sample of the code too?

 Shuffle write increases
 ---

 Key: SPARK-5081
 URL: https://issues.apache.org/jira/browse/SPARK-5081
 Project: Spark
  Issue Type: Bug
  Components: Shuffle
Affects Versions: 1.2.0
Reporter: Kevin Jung

 The size of shuffle write showing in spark web UI is much different when I 
 execute same spark job with same input data in both spark 1.1 and spark 1.2. 
 At sortBy stage, the size of shuffle write is 98.1MB in spark 1.1 but 146.9MB 
 in spark 1.2. 
 I set spark.shuffle.manager option to hash because it's default value is 
 changed but spark 1.2 still writes shuffle output more than spark 1.1.
 It can increase disk I/O overhead exponentially as the input file gets bigger 
 and it causes the jobs take more time to complete. 
 In the case of about 100GB input, for example, the size of shuffle write is 
 39.7GB in spark 1.1 but 91.0GB in spark 1.2.
 spark 1.1
 ||Stage Id||Description||Input||Shuffle Read||Shuffle Write||
 |9|saveAsTextFile| |1169.4KB| |
 |12|combineByKey| |1265.4KB|1275.0KB|
 |6|sortByKey| |1276.5KB| |
 |8|mapPartitions| |91.0MB|1383.1KB|
 |4|apply| |89.4MB| |
 |5|sortBy|155.6MB| |98.1MB|
 |3|sortBy|155.6MB| | |
 |1|collect| |2.1MB| |
 |2|mapValues|155.6MB| |2.2MB|
 |0|first|184.4KB| | |
 spark 1.2
 ||Stage Id||Description||Input||Shuffle Read||Shuffle Write||
 |12|saveAsTextFile| |1170.2KB| |
 |11|combineByKey| |1264.5KB|1275.0KB|
 |8|sortByKey| |1273.6KB| |
 |7|mapPartitions| |134.5MB|1383.1KB|
 |5|zipWithIndex| |132.5MB| |
 |4|sortBy|155.6MB| |146.9MB|
 |3|sortBy|155.6MB| | |
 |2|collect| |2.0MB| |
 |1|mapValues|155.6MB| |2.2MB|
 |0|first|184.4KB| | |



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-5557) Servlet API classes now missing after jetty shading

2015-02-05 Thread Kostas Sakellis (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5557?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14308374#comment-14308374
 ] 

Kostas Sakellis commented on SPARK-5557:


[~pwendell] recommended this which did the trick:
{code}
diff --git a/core/pom.xml b/core/pom.xml
index 2dc5f74..f03ec47 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -132,6 +132,11 @@
   artifactIdjetty-servlet/artifactId
   scopecompile/scope
 /dependency
+dependency
+  groupIdorg.eclipse.jetty.orbit/groupId
+  artifactIdjavax.servlet/artifactId
+  version3.0.0.v201112011016/version
+/dependency
 
 dependency
   groupIdorg.apache.commons/groupId
{code}

 Servlet API classes now missing after jetty shading
 ---

 Key: SPARK-5557
 URL: https://issues.apache.org/jira/browse/SPARK-5557
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.3.0
Reporter: Guoqiang Li
Priority: Blocker

 the log:
 {noformat}
 5/02/03 19:06:39 INFO spark.HttpServer: Starting HTTP Server
 Exception in thread main java.lang.NoClassDefFoundError: 
 javax/servlet/http/HttpServletResponse
   at 
 org.apache.spark.HttpServer.org$apache$spark$HttpServer$$doStart(HttpServer.scala:75)
   at org.apache.spark.HttpServer$$anonfun$1.apply(HttpServer.scala:62)
   at org.apache.spark.HttpServer$$anonfun$1.apply(HttpServer.scala:62)
   at 
 org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1774)
   at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
   at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1765)
   at org.apache.spark.HttpServer.start(HttpServer.scala:62)
   at org.apache.spark.repl.SparkIMain.init(SparkIMain.scala:130)
   at 
 org.apache.spark.repl.SparkILoop$SparkILoopInterpreter.init(SparkILoop.scala:185)
   at 
 org.apache.spark.repl.SparkILoop.createInterpreter(SparkILoop.scala:214)
   at 
 org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:946)
   at 
 org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:942)
   at 
 org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:942)
   at 
 scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
   at 
 org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:942)
   at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1039)
   at org.apache.spark.repl.Main$.main(Main.scala:31)
   at org.apache.spark.repl.Main.main(Main.scala)
   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   at 
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
   at 
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.lang.reflect.Method.invoke(Method.java:606)
   at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:403)
   at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:77)
   at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
 Caused by: java.lang.ClassNotFoundException: 
 javax.servlet.http.HttpServletResponse
   at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
   at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
   at java.security.AccessController.doPrivileged(Native Method)
   at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
   at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
   at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
   ... 25 more
 {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4630) Dynamically determine optimal number of partitions

2015-01-20 Thread Kostas Sakellis (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14284341#comment-14284341
 ] 

Kostas Sakellis commented on SPARK-4630:


I agree that this should be build not assuming SchemaRDD. Like Dryad and Tez 
(which is basically Dryad) we should be able to use runtime statistics (as 
opposed to metastore stats) to compute the optimal partition numbers. 

I'm no Dryad expert but simply read through their papers:
1) Dryad: http://research.microsoft.com/en-us/projects/dryad/eurosys07.pdf
2) DryadLinq: 
http://research.microsoft.com/en-us/projects/DryadLINQ/DryadLINQ.pdf
3) Optimus: http://research.microsoft.com/pubs/185714/Optimus.pdf

The DryadLinq paper has a small section on dynamic partitioning. Section 4.2.2:
{quote}
Dynamic data partitioning sets the number of ver- tices in each stage (i.e., 
the number of partitions of each dataset) at run time based on the size of its 
input data. Traditional databases usually estimate dataset sizes statically, 
but these estimates can be very inaccurate, for ex- ample in the presence of 
correlated queries. DryadLINQ supports dynamic hash and range partitions—for 
range partitions both the number of partitions and the partition- ing key 
ranges are determined at run time by sampling the input dataset.
{quote}

The Optimus paper talks about more optimizations they did in their system that 
runs on top of Dryad. There are a lot of optimizations but dynamic partitioning 
is talked about ins Section 3.1. They describe creating a set of sampled 
histograms, one for each dependent partition, and then depending on the 
operation choose a combining strategy for the statistics. For example, for 
joins they do the product of the histograms. Using the stats from the 
histograms they determine how many vertices (partitions) to add to the graph 
processing. The paper they reference for creating the sampling histogram is 
http://www.mathcs.emory.edu/~cheung/papers/StreamDB/Histogram/1998-Chaudhuri-Histo.pdf
 - I haven't read it yet. They don't really get into how they bootstrap this - 
sampling the original datasources stored in the filesystem.

From what I can tell, [~lianhuiwang]'s patch assumes that all records are the 
same size since it solely looks at the map status and hadoop input sizes. I 
don't think this is good enough to make intelligent decisions as you also need 
to look at the record sizes to be able to prevent skew.

The partial DAG execution described in the Shark paper is similar to what Dryad 
does. [~rxin], why was this not pushed down to core Spark? Partial DAG 
execution could allow us to have a number of runtime optimizations that are 
currently not possible. 

 Dynamically determine optimal number of partitions
 --

 Key: SPARK-4630
 URL: https://issues.apache.org/jira/browse/SPARK-4630
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Reporter: Kostas Sakellis
Assignee: Kostas Sakellis

 Partition sizes play a big part in how fast stages execute during a Spark 
 job. There is a direct relationship between the size of partitions to the 
 number of tasks - larger partitions, fewer tasks. For better performance, 
 Spark has a sweet spot for how large partitions should be that get executed 
 by a task. If partitions are too small, then the user pays a disproportionate 
 cost in scheduling overhead. If the partitions are too large, then task 
 execution slows down due to gc pressure and spilling to disk.
 To increase performance of jobs, users often hand optimize the number(size) 
 of partitions that the next stage gets. Factors that come into play are:
 Incoming partition sizes from previous stage
 number of available executors
 available memory per executor (taking into account 
 spark.shuffle.memoryFraction)
 Spark has access to this data and so should be able to automatically do the 
 partition sizing for the user. This feature can be turned off/on with a 
 configuration option. 
 To make this happen, we propose modifying the DAGScheduler to take into 
 account partition sizes upon stage completion. Before scheduling the next 
 stage, the scheduler can examine the sizes of the partitions and determine 
 the appropriate number tasks to create. Since this change requires 
 non-trivial modifications to the DAGScheduler, a detailed design doc will be 
 attached before proceeding with the work.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-5225) Support coalesed Input Metrics from different sources

2015-01-13 Thread Kostas Sakellis (JIRA)
Kostas Sakellis created SPARK-5225:
--

 Summary: Support coalesed Input Metrics from different sources
 Key: SPARK-5225
 URL: https://issues.apache.org/jira/browse/SPARK-5225
 Project: Spark
  Issue Type: Bug
Reporter: Kostas Sakellis


Currently, If task reads data from more than one block and it is from different 
read methods we ignore the second read method bytes. For example:
 CoalescedRDD
| 
Task1 
  / |\
/   |  \   
  hadoop   hadoop  cached

if Task1 starts reading from the hadoop blocks first, then the input metrics 
for Task1 will only contain input metrics from the hadoop blocks and ignre the 
input metrics from cached blocks. We need to change the way we collect input 
metrics so that it is not a single value but rather a collection of input 
metrics for a task. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-5225) Support coalesed Input Metrics from different sources

2015-01-13 Thread Kostas Sakellis (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-5225?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Sakellis updated SPARK-5225:
---
Description: 
Currently, If task reads data from more than one block and it is from different 
read methods we ignore the second read method bytes. For example:
{noformat}
  CoalescedRDD
   | 
 Task1 
 /  |  \   
 hadoop  hadoop  cached
{noformat}
if Task1 starts reading from the hadoop blocks first, then the input metrics 
for Task1 will only contain input metrics from the hadoop blocks and ignre the 
input metrics from cached blocks. We need to change the way we collect input 
metrics so that it is not a single value but rather a collection of input 
metrics for a task. 

  was:
Currently, If task reads data from more than one block and it is from different 
read methods we ignore the second read method bytes. For example:
 CoalescedRDD
| 
Task1 
  / |\
/   |  \   
  hadoop   hadoop  cached

if Task1 starts reading from the hadoop blocks first, then the input metrics 
for Task1 will only contain input metrics from the hadoop blocks and ignre the 
input metrics from cached blocks. We need to change the way we collect input 
metrics so that it is not a single value but rather a collection of input 
metrics for a task. 


 Support coalesed Input Metrics from different sources
 -

 Key: SPARK-5225
 URL: https://issues.apache.org/jira/browse/SPARK-5225
 Project: Spark
  Issue Type: Bug
Reporter: Kostas Sakellis

 Currently, If task reads data from more than one block and it is from 
 different read methods we ignore the second read method bytes. For example:
 {noformat}
   CoalescedRDD
| 
  Task1 
  /  |  \   
  hadoop  hadoop  cached
 {noformat}
 if Task1 starts reading from the hadoop blocks first, then the input metrics 
 for Task1 will only contain input metrics from the hadoop blocks and ignre 
 the input metrics from cached blocks. We need to change the way we collect 
 input metrics so that it is not a single value but rather a collection of 
 input metrics for a task. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-5190) Allow spark listeners to be added before spark context gets initialized.

2015-01-10 Thread Kostas Sakellis (JIRA)
Kostas Sakellis created SPARK-5190:
--

 Summary: Allow spark listeners to be added before spark context 
gets initialized.
 Key: SPARK-5190
 URL: https://issues.apache.org/jira/browse/SPARK-5190
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Reporter: Kostas Sakellis


Currently, you need the spark context to add spark listener events. But, if you 
wait until the spark context gets created before adding your listener you might 
miss events like blockManagerAdded or executorAdded. We should fix this so you 
can attach a listener to the spark context before it starts any initialization. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-4874) Report number of records read/written in a task

2014-12-17 Thread Kostas Sakellis (JIRA)
Kostas Sakellis created SPARK-4874:
--

 Summary: Report number of records read/written in a task
 Key: SPARK-4874
 URL: https://issues.apache.org/jira/browse/SPARK-4874
 Project: Spark
  Issue Type: Improvement
Reporter: Kostas Sakellis


This metric will help us find key skew using the WebUI



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4874) Report number of records read/written in a task

2014-12-17 Thread Kostas Sakellis (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14250304#comment-14250304
 ] 

Kostas Sakellis commented on SPARK-4874:


I'm working on this.

 Report number of records read/written in a task
 ---

 Key: SPARK-4874
 URL: https://issues.apache.org/jira/browse/SPARK-4874
 Project: Spark
  Issue Type: Improvement
Reporter: Kostas Sakellis

 This metric will help us find key skew using the WebUI



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4857) Add Executor Events to SparkListener

2014-12-15 Thread Kostas Sakellis (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14247578#comment-14247578
 ] 

Kostas Sakellis commented on SPARK-4857:


I'll work on this.

 Add Executor Events to SparkListener
 

 Key: SPARK-4857
 URL: https://issues.apache.org/jira/browse/SPARK-4857
 Project: Spark
  Issue Type: Improvement
Reporter: Kostas Sakellis

 We need to add events to the SparkListener to indicate an executor has been 
 added or removed with corresponding information. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-4857) Add Executor Events to SparkListener

2014-12-15 Thread Kostas Sakellis (JIRA)
Kostas Sakellis created SPARK-4857:
--

 Summary: Add Executor Events to SparkListener
 Key: SPARK-4857
 URL: https://issues.apache.org/jira/browse/SPARK-4857
 Project: Spark
  Issue Type: Improvement
Reporter: Kostas Sakellis


We need to add events to the SparkListener to indicate an executor has been 
added or removed with corresponding information. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-4843) Squash ExecutorRunnable and ExecutorRunnableUtil hierarchy in yarn module

2014-12-14 Thread Kostas Sakellis (JIRA)
Kostas Sakellis created SPARK-4843:
--

 Summary: Squash ExecutorRunnable and ExecutorRunnableUtil 
hierarchy in yarn module
 Key: SPARK-4843
 URL: https://issues.apache.org/jira/browse/SPARK-4843
 Project: Spark
  Issue Type: Improvement
Reporter: Kostas Sakellis


ExecutorRunnableUtil is a parent of ExecutorRunnable because of the yarn-alpha 
and yarn-stable split. Now that yarn-alpha is gone, we can squash the 
unnecessary hierarchy.  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-4774) Make HiveFromSpark example more portable

2014-12-05 Thread Kostas Sakellis (JIRA)
Kostas Sakellis created SPARK-4774:
--

 Summary: Make HiveFromSpark example more portable
 Key: SPARK-4774
 URL: https://issues.apache.org/jira/browse/SPARK-4774
 Project: Spark
  Issue Type: Bug
  Components: Examples
Reporter: Kostas Sakellis


The HiveFromSpark example needs the kv1.txt file to run in a specific location: 
SPARK_HOME/examples/src/main/resources/kv1.txt which assumes you have the 
source tree checked out. 

A more portable way is to copy the kv1.txt to a temporary file that gets 
cleaned up when the jvm shutsdown. This would allow us to run this example 
outside of a source tree.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-4774) Make HiveFromSpark example more portable

2014-12-05 Thread Kostas Sakellis (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-4774?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Sakellis updated SPARK-4774:
---
Component/s: SQL

 Make HiveFromSpark example more portable
 

 Key: SPARK-4774
 URL: https://issues.apache.org/jira/browse/SPARK-4774
 Project: Spark
  Issue Type: Bug
  Components: Examples, SQL
Reporter: Kostas Sakellis

 The HiveFromSpark example needs the kv1.txt file to run in a specific 
 location: SPARK_HOME/examples/src/main/resources/kv1.txt which assumes you 
 have the source tree checked out. 
 A more portable way is to copy the kv1.txt to a temporary file that gets 
 cleaned up when the jvm shutsdown. This would allow us to run this example 
 outside of a source tree.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-4630) Dynamically determine optimal number of partitions

2014-11-26 Thread Kostas Sakellis (JIRA)
Kostas Sakellis created SPARK-4630:
--

 Summary: Dynamically determine optimal number of partitions
 Key: SPARK-4630
 URL: https://issues.apache.org/jira/browse/SPARK-4630
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Reporter: Kostas Sakellis


Partition sizes play a big part in how fast stages execute during a Spark job. 
There is a direct relationship between the size of partitions to the number of 
tasks - larger partitions, fewer tasks. For better performance, Spark has a 
sweet spot for how large partitions should be that get executed by a task. If 
partitions are too small, then the user pays a disproportionate cost in 
scheduling overhead. If the partitions are too large, then task execution slows 
down due to gc pressure and spilling to disk.

To increase performance of jobs, users often hand optimize the number(size) of 
partitions that the next stage gets. Factors that come into play are:
Incoming partition sizes from previous stage
number of available executors
available memory per executor (taking into account spark.shuffle.memoryFraction)

Spark has access to this data and so should be able to automatically do the 
partition sizing for the user. This feature can be turned off/on with a 
configuration option. 

To make this happen, we propose modifying the DAGScheduler to take into account 
partition sizes upon stage completion. Before scheduling the next stage, the 
scheduler can examine the sizes of the partitions and determine the appropriate 
number tasks to create. Since this change requires non-trivial modifications to 
the DAGScheduler, a detailed design doc will be attached before proceeding with 
the work.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-2450) Provide link to YARN executor logs on UI

2014-11-12 Thread Kostas Sakellis (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14209350#comment-14209350
 ] 

Kostas Sakellis commented on SPARK-2450:


I'll pick up the work on this.

 Provide link to YARN executor logs on UI
 

 Key: SPARK-2450
 URL: https://issues.apache.org/jira/browse/SPARK-2450
 Project: Spark
  Issue Type: Improvement
  Components: Web UI, YARN
Affects Versions: 1.0.0
Reporter: Bill Havanki
Priority: Minor

 When running under YARN, provide links to executor logs from the web UI to 
 avoid the need to drill down through the YARN UI.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4092) Input metrics don't work for coalesce()'d RDD's

2014-11-11 Thread Kostas Sakellis (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4092?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14207190#comment-14207190
 ] 

Kostas Sakellis commented on SPARK-4092:


[~aash], yes this should solve a superset of the same problems that SPARK-2630 
aims to fix. I say superset because https://github.com/apache/spark/pull/3120 
also includes a similar fix when hadoop 2.5 is used with the bytesReadCallback. 


 Input metrics don't work for coalesce()'d RDD's
 ---

 Key: SPARK-4092
 URL: https://issues.apache.org/jira/browse/SPARK-4092
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Reporter: Patrick Wendell
Assignee: Kostas Sakellis
Priority: Critical

 In every case where we set input metrics (from both Hadoop and block storage) 
 we currently assume that exactly one input partition is computed within the 
 task. This is not a correct assumption in the general case. The main example 
 in the current API is coalesce(), but user-defined RDD's could also be 
 affected.
 To deal with the most general case, we would need to support the notion of a 
 single task having multiple input sources. A more surgical and less general 
 fix is to simply go to HadoopRDD and check if there are already inputMetrics 
 defined for the task with the same type. If there are, then merge in the 
 new data rather than blowing away the old one.
 This wouldn't cover case where, e.g. a single task has input from both 
 on-disk and in-memory blocks. It _would_ cover the case where someone calls 
 coalesce on a HadoopRDD... which is more common.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4079) Snappy bundled with Spark does not work on older Linux distributions

2014-10-31 Thread Kostas Sakellis (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4079?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14192188#comment-14192188
 ] 

Kostas Sakellis commented on SPARK-4079:


yes, I'm taking this over from Marcelo.

 Snappy bundled with Spark does not work on older Linux distributions
 

 Key: SPARK-4079
 URL: https://issues.apache.org/jira/browse/SPARK-4079
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.0.0
Reporter: Marcelo Vanzin

 This issue has existed at least since 1.0, but has been made worse by 1.1 
 since snappy is now the default compression algorithm. When trying to use it 
 on a CentOS 5 machine, for example, you'll get something like this:
 {noformat}
   java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
 org.xerial.snappy.SnappyLoader.loadNativeLibrary(SnappyLoader.java:319)
at org.xerial.snappy.SnappyLoader.load(SnappyLoader.java:226)
at org.xerial.snappy.Snappy.clinit(Snappy.java:48)
at 
 org.xerial.snappy.SnappyOutputStream.init(SnappyOutputStream.java:79)
at 
 org.apache.spark.io.SnappyCompressionCodec.compressedOutputStream(CompressionCodec.scala:125)
at 
 org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:207)
...
Caused by: java.lang.UnsatisfiedLinkError: 
 /tmp/snappy-1.0.5.3-af72bf3c-9dab-43af-a662-f9af657f06b1-libsnappyjava.so: 
 /usr/lib64/libstdc++.so.6: version `GLIBCXX_3.4.9' not found (required by 
 /tmp/snappy-1.0.5.3-af72bf3c-9dab-43af-a662-f9af657f06b1-libsnappyjava.so)
at java.lang.ClassLoader$NativeLibrary.load(Native Method)
at java.lang.ClassLoader.loadLibrary1(ClassLoader.java:1957)
at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1882)
at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1843)
at java.lang.Runtime.load0(Runtime.java:795)
at java.lang.System.load(System.java:1061)
at 
 org.xerial.snappy.SnappyNativeLoader.load(SnappyNativeLoader.java:39)
... 29 more
 {noformat}
 There are two approaches I can see here (well, 3):
 * Declare CentOS 5 (and similar OSes) not supported, although that would suck 
 for the people who are still on it and already use Spark
 * Fallback to another compression codec if Snappy cannot be loaded
 * Ask the Snappy guys to compile the library on an older OS...
 I think the second would be the best compromise.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4092) Input metrics don't work for coalesce()'d RDD's

2014-10-30 Thread Kostas Sakellis (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4092?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14190974#comment-14190974
 ] 

Kostas Sakellis commented on SPARK-4092:


The current patch I'm working on does the simplest thing to address this issue. 
In the hadoop rdds and cache manager if the task already has input metrics of 
the same read method it will increment them instead of overriding. This simple 
solution should handle the common case of:
{code}
sc.textFile(..).coalesce(5).collect()
{code}
In addition it will cover the situation where blocks are coming from the cache. 

What it won't handle properly is if coalesce (or other rdds with similar 
properties) reads from multiple blocks of mixed read methods (memory vs. 
hadoop). In that case, one input metric will override the other. We have 
several options:
# We create a MIXED readMethod and if we see input metrics from different 
methods, we change to MIXED. This will loose some information because now we 
don't know where the blocks were read from.
# We store multiple inputMetrics for each TaskContext. Up the stack (eg. 
JsonProtocol) we send the array of InputMetrics to the caller. We have to worry 
about backwards compatibility in that case so we can't just remove the single 
inputMetric. We might have to send back a MIXED metric and in addition the 
array for newer clients.
# We punt on this issue for now since it can be argued is not common.
What are people's thoughts on this?



 Input metrics don't work for coalesce()'d RDD's
 ---

 Key: SPARK-4092
 URL: https://issues.apache.org/jira/browse/SPARK-4092
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Reporter: Patrick Wendell
Assignee: Kostas Sakellis
Priority: Critical

 In every case where we set input metrics (from both Hadoop and block storage) 
 we currently assume that exactly one input partition is computed within the 
 task. This is not a correct assumption in the general case. The main example 
 in the current API is coalesce(), but user-defined RDD's could also be 
 affected.
 To deal with the most general case, we would need to support the notion of a 
 single task having multiple input sources. A more surgical and less general 
 fix is to simply go to HadoopRDD and check if there are already inputMetrics 
 defined for the task with the same type. If there are, then merge in the 
 new data rather than blowing away the old one.
 This wouldn't cover case where, e.g. a single task has input from both 
 on-disk and in-memory blocks. It _would_ cover the case where someone calls 
 coalesce on a HadoopRDD... which is more common.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-1239) Don't fetch all map output statuses at each reducer during shuffles

2014-10-23 Thread Kostas Sakellis (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14182110#comment-14182110
 ] 

Kostas Sakellis commented on SPARK-1239:


Apologies for not commenting on this JIRA sooner but since I'm new to Spark, it 
has taken a bit of time to wrap my head around how the different schedulers 
(DAG and Task) interoperate. I'm currently investigating completely removing 
the MapOutputTracker class. Instead, the DAGScheduler can push down the 
required map status' to the next stage. I'm thinking of modifying the 
DAGScheduler.submitMissingTasks(..) to take in (or query) the mapStatus' of the 
previous completed stage. When a new ShuffleMapTask gets created, we pass the 
filtered mapStatus' necessary for that task. The map status data can be stored 
in the TaskContext and used in the BlockStoreShuffleFetcher when block data is 
being read (currently uses the MapOutputTracker). What I'm investigating 
currently is how to filter the data when I create the ShuffleMapTask - the 
BlockStoreShuffleFetcher uses a shuffleId which I don't seem to have access to 
in the DAGScheduler.

I haven't yet written any code to test this out so if anyone has any concerns 
please let me know. Also, [~joshrosen] pointed out that the size of the map 
output structure even after it has been filtered could still be very large. 
This is worth investigating.

 Don't fetch all map output statuses at each reducer during shuffles
 ---

 Key: SPARK-1239
 URL: https://issues.apache.org/jira/browse/SPARK-1239
 Project: Spark
  Issue Type: Improvement
  Components: Shuffle, Spark Core
Affects Versions: 1.0.2, 1.1.0
Reporter: Patrick Wendell
Assignee: Josh Rosen

 Instead we should modify the way we fetch map output statuses to take both a 
 mapper and a reducer - or we should just piggyback the statuses on each task. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-1239) Don't fetch all map output statuses at each reducer during shuffles

2014-09-05 Thread Kostas Sakellis (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14123883#comment-14123883
 ] 

Kostas Sakellis commented on SPARK-1239:


[~pwendell] I'd like to take a crack at this since it is affecting one of our 
customers. 

 Don't fetch all map output statuses at each reducer during shuffles
 ---

 Key: SPARK-1239
 URL: https://issues.apache.org/jira/browse/SPARK-1239
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Reporter: Patrick Wendell
Assignee: Andrew Or
 Fix For: 1.1.0


 Instead we should modify the way we fetch map output statuses to take both a 
 mapper and a reducer - or we should just piggyback the statuses on each task. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org