Adding/Using More Resolution Types on JIRA

2015-05-12 Thread Patrick Wendell
In Spark we sometimes close issues as something other than Fixed,
and this is an important part of maintaining our JIRA.

The current resolution types we use are the following:

Won't Fix - bug fix or (more often) feature we don't want to add
Invalid - issue is underspecified or not appropriate for a JIRA issue
Duplicate - duplicate of another JIRA
Cannot Reproduce - bug that could not be reproduced
Not A Problem - issue purports to represent a bug, but does not

I would like to propose adding a few new resolutions. This will
require modifying the ASF JIRA, but infra said they are open to
proposals as long as they are considered of broad interest.

My issue with the current set of resolutions are that Won't Fix is a
big catch all we use for many different things. Most often it's used
for things that aren't even bugs even though it has Fix in the name.
I'm proposing adding:

Inactive - A feature or bug that has had no activity from users or
developers in a long time
Out of Scope - A feature proposal that is not in scope given the projects goals
Later - A feature not on the immediate roadmap, but potentially of
interest longer term (this one already exists, I'm just proposing to
start using it)

I am in no way proposing changes to the decision making model around
JIRA's, notably that it is consensus based and that all resolutions
are considered tentative and fully reversible.

The benefits I see of this change would be the following:
1. Inactive: A way to clear out inactive/dead JIRA's without
indicating a decision has been made one way or the other.
2. Out of Scope: It more clearly explains closing out-of-scope
features than the generic Won't Fix. Also makes it more clear to
future contributors what is considered in scope for Spark.
3. Later: A way to signal that issues aren't targeted for a near term
version. This would help avoid the mess we have now of like 200+
issues targeted at each version and target version being a very bad
indicator of actual roadmap. An alternative on this one is to have a
version called Later or Parking Lot but not close the issues.

Any thoughts on this?

- Patrick

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: Adding/Using More Resolution Types on JIRA

2015-05-12 Thread Nicholas Chammas
I tend to find that any large project has a lot of walking dead JIRAs, and
pretending they are simply Open causes problems. Any state is better for
these, so I favor this.

Agreed.


   1. Inactive: A way to clear out inactive/dead JIRA’s without
   indicating a decision has been made one way or the other.

 This is a good idea, and perhaps the process of closing JIRAs as Inactive
can be automated. If *nothing* about a JIRA has changed in 12 months or
more (e.g. current oldest open Spark issue; dates to Aug 2013: SPARK-867
https://issues.apache.org/jira/browse/SPARK-867), perhaps a bot can mark
it as such for us. (Here’s a list of stale issues
https://issues.apache.org/jira/browse/SPARK-867?jql=project%20=%20SPARK%20AND%20resolution%20=%20Unresolved%20AND%20updated%20%3C=%20-26w%20ORDER%20BY%20updated%20ASC
).

This doesn’t mean the issue is invalid or won’t be addressed, but it gets
it out of the “Open” queue, which ideally should be a high churn queue
(e.g. stuff doesn’t stay in there forever).

Nick
​

On Tue, May 12, 2015 at 4:49 AM Sean Owen so...@cloudera.com wrote:

 I tend to find that any large project has a lot of walking dead JIRAs, and
 pretending they are simply Open causes problems. Any state is better for
 these, so I favor this.

 The possible objection is that this will squash or hide useful issues, but
 in practice we have the opposite problem. Resolved issues are still
 searchable by default, and, people aren't shy about opening duplicates
 anyway. At least the semantics Later do not discourage a diligent searcher
 from considering commenting on and reopening such an archived JIRA.

 Patrick this could piggy back on INFRA-9513.

 As a corollary I would welcome deciding that Target Version should be used
 more narrowly to mean 'I really mean to help resolve this for the indicated
 version'. Setting it to a future version just to mean Later should instead
 turn into resolving the JIRA.

 Last: if JIRAs are regularly ice-boxed this way, I think it should trigger
 some reflection. Why are these JIRAs going nowhere? For completely normal
 reasons or does it mean too many TODOs are filed and forgotten? That's no
 comment on the current state, just something to watch.

 So: yes I like the idea.
 On May 12, 2015 8:50 AM, Patrick Wendell pwend...@gmail.com wrote:

  In Spark we sometimes close issues as something other than Fixed,
  and this is an important part of maintaining our JIRA.
 
  The current resolution types we use are the following:
 
  Won't Fix - bug fix or (more often) feature we don't want to add
  Invalid - issue is underspecified or not appropriate for a JIRA issue
  Duplicate - duplicate of another JIRA
  Cannot Reproduce - bug that could not be reproduced
  Not A Problem - issue purports to represent a bug, but does not
 
  I would like to propose adding a few new resolutions. This will
  require modifying the ASF JIRA, but infra said they are open to
  proposals as long as they are considered of broad interest.
 
  My issue with the current set of resolutions are that Won't Fix is a
  big catch all we use for many different things. Most often it's used
  for things that aren't even bugs even though it has Fix in the name.
  I'm proposing adding:
 
  Inactive - A feature or bug that has had no activity from users or
  developers in a long time
  Out of Scope - A feature proposal that is not in scope given the projects
  goals
  Later - A feature not on the immediate roadmap, but potentially of
  interest longer term (this one already exists, I'm just proposing to
  start using it)
 
  I am in no way proposing changes to the decision making model around
  JIRA's, notably that it is consensus based and that all resolutions
  are considered tentative and fully reversible.
 
  The benefits I see of this change would be the following:
  1. Inactive: A way to clear out inactive/dead JIRA's without
  indicating a decision has been made one way or the other.
  2. Out of Scope: It more clearly explains closing out-of-scope
  features than the generic Won't Fix. Also makes it more clear to
  future contributors what is considered in scope for Spark.
  3. Later: A way to signal that issues aren't targeted for a near term
  version. This would help avoid the mess we have now of like 200+
  issues targeted at each version and target version being a very bad
  indicator of actual roadmap. An alternative on this one is to have a
  version called Later or Parking Lot but not close the issues.
 
  Any thoughts on this?
 
  - Patrick
 
  -
  To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
  For additional commands, e-mail: dev-h...@spark.apache.org
 
 



s3 vfs on Mesos Slaves

2015-05-12 Thread Stephen Carman
We have a small mesos cluster and these slaves need to have a vfs setup on them 
so that the slaves can pull down the data they need from S3 when spark runs.

There doesn’t seem to be any obvious way online on how to do this or how easily 
accomplish this. Does anyone have some best practices or some ideas about how 
to accomplish this?

An example stack trace when a job is ran on the mesos cluster…

Any idea how to get this going? Like somehow bootstrapping spark on run or 
something?

Thanks,
Steve


java.io.IOException: Unsupported scheme s3n for URI s3n://removed
at com.coldlight.ccc.vfs.NeuronPath.toPath(NeuronPath.java:43)
at 
com.coldlight.neuron.data.ClquetPartitionedData.makeInputStream(ClquetPartitionedData.java:465)
at 
com.coldlight.neuron.data.ClquetPartitionedData.access$200(ClquetPartitionedData.java:42)
at 
com.coldlight.neuron.data.ClquetPartitionedData$Iter.init(ClquetPartitionedData.java:330)
at 
com.coldlight.neuron.data.ClquetPartitionedData.compute(ClquetPartitionedData.java:304)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
15/05/12 13:57:51 ERROR Executor: Exception in task 0.1 in stage 0.0 (TID 1)
java.lang.RuntimeException: java.io.IOException: Unsupported scheme s3n for URI 
s3n://removed
at 
com.coldlight.neuron.data.ClquetPartitionedData.compute(ClquetPartitionedData.java:307)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Unsupported scheme s3n for URI s3n://removed
at com.coldlight.ccc.vfs.NeuronPath.toPath(NeuronPath.java:43)
at 
com.coldlight.neuron.data.ClquetPartitionedData.makeInputStream(ClquetPartitionedData.java:465)
at 
com.coldlight.neuron.data.ClquetPartitionedData.access$200(ClquetPartitionedData.java:42)
at 
com.coldlight.neuron.data.ClquetPartitionedData$Iter.init(ClquetPartitionedData.java:330)
at 
com.coldlight.neuron.data.ClquetPartitionedData.compute(ClquetPartitionedData.java:304)
... 8 more

This e-mail is intended solely for the above-mentioned recipient and it may 
contain confidential or privileged information. If you have received it in 
error, please notify us immediately and delete the e-mail. You must not copy, 
distribute, disclose or take any action in reliance on it. In addition, the 
contents of an attachment to this e-mail may contain software viruses which 
could damage your own computer system. While ColdLight Solutions, LLC has taken 
every reasonable precaution to minimize this risk, we cannot accept liability 
for any damage which you sustain as a result of software viruses. You should 
perform your own virus checks before opening the attachment.


Sharing memory across applications/integration

2015-05-12 Thread Alexey Goncharuk
Hello Spark community,

I am currently trying to implement a proof-of-concept RDD that will allow
to integrate Apache Spark and Apache Ignite (incubating) [1]. My original
idea was to embed an Ignite node in Spark's worker process, in order for
the user code to have a direct access to in-memory data as it gives the
best performance without any need to explicitly load data into Spark.

However, after looking at the documentation and the following questions on
the user list [2], [3] I realized that it might be impossible to implement.

So can anybody in the community clarify or point me to the documentation
regarding the following questions:

   - Does worker spawn a new process for each application? Is there a way
   for workers to reuse the same process for different Spark contexts?
   - Is there a way to embed a worker in a user process?
   - Is there a way to attach a piece of user logic to a worker lifecycle
   events (initialization/destroy)?

Thanks,
Alexey



[1] http://ignite.incubator.apache.org/
[2]
http://apache-spark-user-list.1001560.n3.nabble.com/Embedding-Spark-Masters-Zk-Workers-SparkContext-App-in-single-JVM-clustered-sorta-for-symmetric-depl-td17711.html
[3]
http://apache-spark-user-list.1001560.n3.nabble.com/Sharing-memory-across-applications-td11845.html


Re: Change for submitting to yarn in 1.3.1

2015-05-12 Thread Marcelo Vanzin
On Tue, May 12, 2015 at 11:34 AM, Kevin Markey kevin.mar...@oracle.com
wrote:

 I understand that SparkLauncher was supposed to address these issues, but
 it really doesn't.  Yarn already provides indirection and an arm's length
 transaction for starting Spark on a cluster. The launcher introduces yet
 another layer of indirection and dissociates the Yarn Client from the
 application that launches it.


Well, not fully. The launcher was supposed to solve how to launch a Spark
app programatically, but in the first version nothing was added to
actually gather information about the running app. It's also limited in the
way it works because of Spark's limitations (one context per JVM, etc).

Still, adding things like this is something that is definitely in the scope
for the launcher library; information such as app id can be useful for the
code launching the app, not just in yarn mode. We just have to find a clean
way to provide that information to the caller.


 I am still reading the newest code, and we are still researching options
 to move forward.  If there are alternatives, we'd like to know.


Super hacky, but if you launch Spark as a child process you could parse the
stderr and get the app ID.

-- 
Marcelo


[build system] brief downtime tomorrow morning (5-12-15, 7am PDT)

2015-05-12 Thread shane knapp
i will need to restart jenkins to finish a plugin install and resolve
https://issues.apache.org/jira/browse/SPARK-7561

this will be very brief, and i'll retrigger any errant jobs i kill.

please let me know if there are any comments/questions/concerns.

thanks!

shane


[IMPORTANT] Committers please update merge script

2015-05-12 Thread Patrick Wendell
Due to an ASF infrastructure change (bug?) [1] the default JIRA
resolution status has switched to Pending Closed. I've made a change
to our merge script to coerce the correct status of Fixed when
resolving [2]. Please upgrade the merge script to master.

I've manually corrected JIRA's that were closed with the incorrect
status. Let me know if you have any issues.

[1] https://issues.apache.org/jira/browse/INFRA-9646

[2] 
https://github.com/apache/spark/commit/1b9e434b6c19f23a01e9875a3c1966cd03ce8e2d

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



回复: [PySpark DataFrame] When a Row is not a Row

2015-05-12 Thread Davies Liu
The class (called Row) for rows from Spark SQL is created on the fly, is 
different from pyspark.sql.Row (is an public API to create Row by users).  

The reason we done it in this way is that we want to have better performance 
when accessing the columns. Basically, the rows are just named tuples (called 
`Row`).  

--  
Davies Liu
Sent with Sparrow (http://www.sparrowmailapp.com/?sig)

已使用 Sparrow (http://www.sparrowmailapp.com/?sig)  

在 2015年5月12日 星期二,上午4:49,Nicholas Chammas 写道:

 This is really strange.
  
# Spark 1.3.1
print type(results)
 

   
  
 class 'pyspark.sql.dataframe.DataFrame'
  
a = results.take(1)[0]
  
print type(a)
 class 'pyspark.sql.types.Row'
  
print pyspark.sql.types.Row
 class 'pyspark.sql.types.Row'
  
print type(a) == pyspark.sql.types.Row
 False
print isinstance(a, pyspark.sql.types.Row)

   
  
 False
  
 If I set a as follows, then the type checks pass fine.
  
 a = pyspark.sql.types.Row('name')('Nick')
  
 Is this a bug? What can I do to narrow down the source?
  
 results is a massive DataFrame of spark-perf results.
  
 Nick
 ​
  
  




Re: Re: Sort Shuffle performance issues about using AppendOnlyMap for large data sets

2015-05-12 Thread fightf...@163.com
Hi, there
Which version are you using ? Actually the problem seems gone after we change 
our spark version from 1.2.0 to 1.3.0 

Not sure what the internal changes did.

Best,
Sun.



fightf...@163.com
 
From: Night Wolf
Date: 2015-05-12 22:05
To: fightf...@163.com
CC: Patrick Wendell; user; dev
Subject: Re: Re: Sort Shuffle performance issues about using AppendOnlyMap for 
large data sets
Seeing similar issues, did you find a solution? One would be to increase the 
number of partitions if you're doing lots of object creation. 

On Thu, Feb 12, 2015 at 7:26 PM, fightf...@163.com fightf...@163.com wrote:
Hi, patrick

Really glad to get your reply. 
Yes, we are doing group by operations for our work. We know that this is common 
for growTable when processing large data sets.

The problem actually goes to : Do we have any possible chance to self-modify 
the initialCapacity using specifically for our 
application? Does spark provide such configs for achieving that goal? 

We know that this is trickle to get it working. Just want to know that how 
could this be resolved, or from other possible channel for
we did not cover.

Expecting for your kind advice.

Thanks,
Sun.



fightf...@163.com
 
From: Patrick Wendell
Date: 2015-02-12 16:12
To: fightf...@163.com
CC: user; dev
Subject: Re: Re: Sort Shuffle performance issues about using AppendOnlyMap for 
large data sets
The map will start with a capacity of 64, but will grow to accommodate
new data. Are you using the groupBy operator in Spark or are you using
Spark SQL's group by? This usually happens if you are grouping or
aggregating in a way that doesn't sufficiently condense the data
created from each input partition.
 
- Patrick
 
On Wed, Feb 11, 2015 at 9:37 PM, fightf...@163.com fightf...@163.com wrote:
 Hi,

 Really have no adequate solution got for this issue. Expecting any available
 analytical rules or hints.

 Thanks,
 Sun.

 
 fightf...@163.com


 From: fightf...@163.com
 Date: 2015-02-09 11:56
 To: user; dev
 Subject: Re: Sort Shuffle performance issues about using AppendOnlyMap for
 large data sets
 Hi,
 Problem still exists. Any experts would take a look at this?

 Thanks,
 Sun.

 
 fightf...@163.com


 From: fightf...@163.com
 Date: 2015-02-06 17:54
 To: user; dev
 Subject: Sort Shuffle performance issues about using AppendOnlyMap for large
 data sets
 Hi, all
 Recently we had caught performance issues when using spark 1.2.0 to read
 data from hbase and do some summary work.
 Our scenario means to : read large data sets from hbase (maybe 100G+ file) ,
 form hbaseRDD, transform to schemardd,
 groupby and aggregate the data while got fewer new summary data sets,
 loading data into hbase (phoenix).

 Our major issue lead to : aggregate large datasets to get summary data sets
 would consume too long time (1 hour +) , while that
 should be supposed not so bad performance. We got the dump file attached and
 stacktrace from jstack like the following:

 From the stacktrace and dump file we can identify that processing large
 datasets would cause frequent AppendOnlyMap growing, and
 leading to huge map entrysize. We had referenced the source code of
 org.apache.spark.util.collection.AppendOnlyMap and found that
 the map had been initialized with capacity of 64. That would be too small
 for our use case.

 So the question is : Does anyone had encounted such issues before? How did
 that be resolved? I cannot find any jira issues for such problems and
 if someone had seen, please kindly let us know.

 More specified solution would goes to : Does any possibility exists for user
 defining the map capacity releatively in spark? If so, please
 tell how to achieve that.

 Best Thanks,
 Sun.

Thread 22432: (state = IN_JAVA)
 - org.apache.spark.util.collection.AppendOnlyMap.growTable() @bci=87,
 line=224 (Compiled frame; information may be imprecise)
 - org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.growTable()
 @bci=1, line=38 (Interpreted frame)
 - org.apache.spark.util.collection.AppendOnlyMap.incrementSize() @bci=22,
 line=198 (Compiled frame)
 -
 org.apache.spark.util.collection.AppendOnlyMap.changeValue(java.lang.Object,
 scala.Function2) @bci=201, line=145 (Compiled frame)
 -
 org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(java.lang.Object,
 scala.Function2) @bci=3, line=32 (Compiled frame)
 -
 org.apache.spark.util.collection.ExternalSorter.insertAll(scala.collection.Iterator)
 @bci=141, line=205 (Compiled frame)
 -
 org.apache.spark.shuffle.sort.SortShuffleWriter.write(scala.collection.Iterator)
 @bci=74, line=58 (Interpreted frame)
 -
 org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext)
 @bci=169, line=68 (Interpreted frame)
 -
 org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext)
 @bci=2, line=41 (Interpreted frame)
 - org.apache.spark.scheduler.Task.run(long) @bci=77, line=56 (Interpreted
 frame)
 - 

Re: Getting Access is denied error while cloning Spark source using Eclipse

2015-05-12 Thread Akhil Das
May be you should check where exactly its throwing up permission denied
(possibly trying to write to some directory). Also you can try manually
cloning the git repo to a directory and then try opening that in eclipse.

Thanks
Best Regards

On Tue, May 12, 2015 at 3:46 PM, Chandrashekhar Kotekar 
shekhar.kote...@gmail.com wrote:

 Hi,

 I am  trying to clone Spark source using Eclipse. After providing spark
 source URL, eclipse downloads some code which I can see in download
 location but as soon as downloading reaches 99% Eclipse throws Gi
 repository clone failed. Access is denied error.

 Has anyone encountered such a problem? I want to contribute to Apache spark
 source code and I am newbie, first time trying to contribute to open source
 project. Can anyone please help me in solving this error?

 Regards,
 Chandrash3khar Kotekar
 Mobile - +91 8600011455



Getting Access is denied error while cloning Spark source using Eclipse

2015-05-12 Thread Chandrashekhar Kotekar
Hi,

I am  trying to clone Spark source using Eclipse. After providing spark
source URL, eclipse downloads some code which I can see in download
location but as soon as downloading reaches 99% Eclipse throws Gi
repository clone failed. Access is denied error.

Has anyone encountered such a problem? I want to contribute to Apache spark
source code and I am newbie, first time trying to contribute to open source
project. Can anyone please help me in solving this error?

Regards,
Chandrash3khar Kotekar
Mobile - +91 8600011455


@since version tag for all dataframe/sql methods

2015-05-12 Thread Reynold Xin
I added @since version tag for all public dataframe/sql methods/classes in
this patch: https://github.com/apache/spark/pull/6101/files

From now on, if you merge anything related to DF/SQL, please make sure the
public functions have @since tag. Thanks.


Re: large volume spark job spends most of the time in AppendOnlyMap.changeValue

2015-05-12 Thread Night Wolf
I'm seeing a similar thing with a slightly different stack trace. Ideas?

org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:150)
org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:205)
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:56)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
org.apache.spark.scheduler.Task.run(Task.scala:64)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)


On Tue, May 12, 2015 at 5:55 AM, Reynold Xin r...@databricks.com wrote:

 Looks like it is spending a lot of time doing hash probing. It could be a
 number of the following:

 1. hash probing itself is inherently expensive compared with rest of your
 workload

 2. murmur3 doesn't work well with this key distribution

 3. quadratic probing (triangular sequence) with a power-of-2 hash table
 works really badly for this workload.

 One way to test this is to instrument changeValue function to store the
 number of probes in total, and then log it. We added this probing
 capability to the new Bytes2Bytes hash map we built. We should consider
 just having it being reported as some built-in metrics to facilitate
 debugging.


 https://github.com/apache/spark/blob/b83091ae4589feea78b056827bc3b7659d271e41/unsafe/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java#L214






 On Mon, May 11, 2015 at 4:21 AM, Michal Haris michal.ha...@visualdna.com
 wrote:

  This is the stack trace of the worker thread:
 
 
 
 org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:150)
 
 
 org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
 
 
 org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130)
  org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:60)
 
 
 org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:46)
  org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
  org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
  org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
  org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
  org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
  org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
  org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
  org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
  org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
  org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
  org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
  org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
  org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
  org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
  org.apache.spark.scheduler.Task.run(Task.scala:64)
  org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
 
 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 
 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
  java.lang.Thread.run(Thread.java:745)
 
  On 8 May 2015 at 22:12, Josh Rosen rosenvi...@gmail.com wrote:
 
  Do you have any more specific profiling data that you can share?  I'm
  curious to know where AppendOnlyMap.changeValue is being called from.
 
  On Fri, May 8, 2015 at 1:26 PM, Michal Haris 
 michal.ha...@visualdna.com
  wrote:
 
  +dev
  On 6 May 2015 10:45, Michal Haris michal.ha...@visualdna.com
 wrote:
 
   Just wanted to check if somebody has seen similar behaviour or knows
  what
   we might be doing wrong. We have a relatively complex spark
 application
   which processes half a terabyte of data at various stages. We have
  profiled
   it in several ways and everything seems to point to one place where
  90% of
   the time is spent:  AppendOnlyMap.changeValue. The job scales and is
   relatively faster than its map-reduce alternative but it still feels
  slower
   than it should be. I am suspecting too much spill but I haven't seen
  any
   improvement by increasing number of partitions to 10k. Any idea would
  be
   appreciated.
  
   --
   Michal Haris
   Technical Architect
   direct line: +44 (0) 207 749 0229
   www.visualdna.com | t: +44 (0) 207 734 7033,
  
 
 
 
 
 
  --
  Michal Haris
  Technical Architect
  direct line: +44 (0) 207 749 0229
  www.visualdna.com | t: +44 (0) 207 734 7033,
 



Re: Re: Sort Shuffle performance issues about using AppendOnlyMap for large data sets

2015-05-12 Thread Night Wolf
Seeing similar issues, did you find a solution? One would be to increase
the number of partitions if you're doing lots of object creation.

On Thu, Feb 12, 2015 at 7:26 PM, fightf...@163.com fightf...@163.com
wrote:

 Hi, patrick

 Really glad to get your reply.
 Yes, we are doing group by operations for our work. We know that this is
 common for growTable when processing large data sets.

 The problem actually goes to : Do we have any possible chance to
 self-modify the initialCapacity using specifically for our
 application? Does spark provide such configs for achieving that goal?

 We know that this is trickle to get it working. Just want to know that how
 could this be resolved, or from other possible channel for
 we did not cover.

 Expecting for your kind advice.

 Thanks,
 Sun.

 --
 fightf...@163.com


 *From:* Patrick Wendell pwend...@gmail.com
 *Date:* 2015-02-12 16:12
 *To:* fightf...@163.com
 *CC:* user u...@spark.apache.org; dev dev@spark.apache.org
 *Subject:* Re: Re: Sort Shuffle performance issues about using
 AppendOnlyMap for large data sets
 The map will start with a capacity of 64, but will grow to accommodate
 new data. Are you using the groupBy operator in Spark or are you using
 Spark SQL's group by? This usually happens if you are grouping or
 aggregating in a way that doesn't sufficiently condense the data
 created from each input partition.

 - Patrick

 On Wed, Feb 11, 2015 at 9:37 PM, fightf...@163.com fightf...@163.com
 wrote:
  Hi,
 
  Really have no adequate solution got for this issue. Expecting any
 available
  analytical rules or hints.
 
  Thanks,
  Sun.
 
  
  fightf...@163.com
 
 
  From: fightf...@163.com
  Date: 2015-02-09 11:56
  To: user; dev
  Subject: Re: Sort Shuffle performance issues about using AppendOnlyMap
 for
  large data sets
  Hi,
  Problem still exists. Any experts would take a look at this?
 
  Thanks,
  Sun.
 
  
  fightf...@163.com
 
 
  From: fightf...@163.com
  Date: 2015-02-06 17:54
  To: user; dev
  Subject: Sort Shuffle performance issues about using AppendOnlyMap for
 large
  data sets
  Hi, all
  Recently we had caught performance issues when using spark 1.2.0 to read
  data from hbase and do some summary work.
  Our scenario means to : read large data sets from hbase (maybe 100G+
 file) ,
  form hbaseRDD, transform to schemardd,
  groupby and aggregate the data while got fewer new summary data sets,
  loading data into hbase (phoenix).
 
  Our major issue lead to : aggregate large datasets to get summary data
 sets
  would consume too long time (1 hour +) , while that
  should be supposed not so bad performance. We got the dump file attached
 and
  stacktrace from jstack like the following:
 
  From the stacktrace and dump file we can identify that processing large
  datasets would cause frequent AppendOnlyMap growing, and
  leading to huge map entrysize. We had referenced the source code of
  org.apache.spark.util.collection.AppendOnlyMap and found that
  the map had been initialized with capacity of 64. That would be too small
  for our use case.
 
  So the question is : Does anyone had encounted such issues before? How
 did
  that be resolved? I cannot find any jira issues for such problems and
  if someone had seen, please kindly let us know.
 
  More specified solution would goes to : Does any possibility exists for
 user
  defining the map capacity releatively in spark? If so, please
  tell how to achieve that.
 
  Best Thanks,
  Sun.
 
 Thread 22432: (state = IN_JAVA)
  - org.apache.spark.util.collection.AppendOnlyMap.growTable() @bci=87,
  line=224 (Compiled frame; information may be imprecise)
  - org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.growTable()
  @bci=1, line=38 (Interpreted frame)
  - org.apache.spark.util.collection.AppendOnlyMap.incrementSize() @bci=22,
  line=198 (Compiled frame)
  -
 
 org.apache.spark.util.collection.AppendOnlyMap.changeValue(java.lang.Object,
  scala.Function2) @bci=201, line=145 (Compiled frame)
  -
 
 org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(java.lang.Object,
  scala.Function2) @bci=3, line=32 (Compiled frame)
  -
 
 org.apache.spark.util.collection.ExternalSorter.insertAll(scala.collection.Iterator)
  @bci=141, line=205 (Compiled frame)
  -
 
 org.apache.spark.shuffle.sort.SortShuffleWriter.write(scala.collection.Iterator)
  @bci=74, line=58 (Interpreted frame)
  -
 
 org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext)
  @bci=169, line=68 (Interpreted frame)
  -
 
 org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext)
  @bci=2, line=41 (Interpreted frame)
  - org.apache.spark.scheduler.Task.run(long) @bci=77, line=56 (Interpreted
  frame)
  - org.apache.spark.executor.Executor$TaskRunner.run() @bci=310, line=196
  (Interpreted frame)
  -
 
 

Re: Adding/Using More Resolution Types on JIRA

2015-05-12 Thread Sean Owen
I tend to find that any large project has a lot of walking dead JIRAs, and
pretending they are simply Open causes problems. Any state is better for
these, so I favor this.

The possible objection is that this will squash or hide useful issues, but
in practice we have the opposite problem. Resolved issues are still
searchable by default, and, people aren't shy about opening duplicates
anyway. At least the semantics Later do not discourage a diligent searcher
from considering commenting on and reopening such an archived JIRA.

Patrick this could piggy back on INFRA-9513.

As a corollary I would welcome deciding that Target Version should be used
more narrowly to mean 'I really mean to help resolve this for the indicated
version'. Setting it to a future version just to mean Later should instead
turn into resolving the JIRA.

Last: if JIRAs are regularly ice-boxed this way, I think it should trigger
some reflection. Why are these JIRAs going nowhere? For completely normal
reasons or does it mean too many TODOs are filed and forgotten? That's no
comment on the current state, just something to watch.

So: yes I like the idea.
On May 12, 2015 8:50 AM, Patrick Wendell pwend...@gmail.com wrote:

 In Spark we sometimes close issues as something other than Fixed,
 and this is an important part of maintaining our JIRA.

 The current resolution types we use are the following:

 Won't Fix - bug fix or (more often) feature we don't want to add
 Invalid - issue is underspecified or not appropriate for a JIRA issue
 Duplicate - duplicate of another JIRA
 Cannot Reproduce - bug that could not be reproduced
 Not A Problem - issue purports to represent a bug, but does not

 I would like to propose adding a few new resolutions. This will
 require modifying the ASF JIRA, but infra said they are open to
 proposals as long as they are considered of broad interest.

 My issue with the current set of resolutions are that Won't Fix is a
 big catch all we use for many different things. Most often it's used
 for things that aren't even bugs even though it has Fix in the name.
 I'm proposing adding:

 Inactive - A feature or bug that has had no activity from users or
 developers in a long time
 Out of Scope - A feature proposal that is not in scope given the projects
 goals
 Later - A feature not on the immediate roadmap, but potentially of
 interest longer term (this one already exists, I'm just proposing to
 start using it)

 I am in no way proposing changes to the decision making model around
 JIRA's, notably that it is consensus based and that all resolutions
 are considered tentative and fully reversible.

 The benefits I see of this change would be the following:
 1. Inactive: A way to clear out inactive/dead JIRA's without
 indicating a decision has been made one way or the other.
 2. Out of Scope: It more clearly explains closing out-of-scope
 features than the generic Won't Fix. Also makes it more clear to
 future contributors what is considered in scope for Spark.
 3. Later: A way to signal that issues aren't targeted for a near term
 version. This would help avoid the mess we have now of like 200+
 issues targeted at each version and target version being a very bad
 indicator of actual roadmap. An alternative on this one is to have a
 version called Later or Parking Lot but not close the issues.

 Any thoughts on this?

 - Patrick

 -
 To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
 For additional commands, e-mail: dev-h...@spark.apache.org




Re: large volume spark job spends most of the time in AppendOnlyMap.changeValue

2015-05-12 Thread Matei Zaharia
It could also be that your hash function is expensive. What is the key class 
you have for the reduceByKey / groupByKey?

Matei

 On May 12, 2015, at 10:08 AM, Night Wolf nightwolf...@gmail.com wrote:
 
 I'm seeing a similar thing with a slightly different stack trace. Ideas?
 
 org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:150)
 org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
 org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:205)
 org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:56)
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 org.apache.spark.scheduler.Task.run(Task.scala:64)
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 java.lang.Thread.run(Thread.java:745)
 
 On Tue, May 12, 2015 at 5:55 AM, Reynold Xin r...@databricks.com 
 mailto:r...@databricks.com wrote:
 Looks like it is spending a lot of time doing hash probing. It could be a
 number of the following:
 
 1. hash probing itself is inherently expensive compared with rest of your
 workload
 
 2. murmur3 doesn't work well with this key distribution
 
 3. quadratic probing (triangular sequence) with a power-of-2 hash table
 works really badly for this workload.
 
 One way to test this is to instrument changeValue function to store the
 number of probes in total, and then log it. We added this probing
 capability to the new Bytes2Bytes hash map we built. We should consider
 just having it being reported as some built-in metrics to facilitate
 debugging.
 
 https://github.com/apache/spark/blob/b83091ae4589feea78b056827bc3b7659d271e41/unsafe/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java#L214
  
 https://github.com/apache/spark/blob/b83091ae4589feea78b056827bc3b7659d271e41/unsafe/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java#L214
 
 
 
 
 
 
 On Mon, May 11, 2015 at 4:21 AM, Michal Haris michal.ha...@visualdna.com 
 mailto:michal.ha...@visualdna.com
 wrote:
 
  This is the stack trace of the worker thread:
 
 
  org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:150)
 
  org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
 
  org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130)
  org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:60)
 
  org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:46)
  org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
  org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
  org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
  org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
  org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
  org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
  org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
  org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
  org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
  org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
  org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
  org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
  org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
  org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
  org.apache.spark.scheduler.Task.run(Task.scala:64)
  org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
 
  java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 
  java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
  java.lang.Thread.run(Thread.java:745)
 
  On 8 May 2015 at 22:12, Josh Rosen rosenvi...@gmail.com 
  mailto:rosenvi...@gmail.com wrote:
 
  Do you have any more specific profiling data that you can share?  I'm
  curious to know where AppendOnlyMap.changeValue is being called from.
 
  On Fri, May 8, 2015 at 1:26 PM, Michal Haris michal.ha...@visualdna.com 
  mailto:michal.ha...@visualdna.com
  wrote:
 
  +dev
  On 6 May 2015 10:45, Michal Haris michal.ha...@visualdna.com 
  mailto:michal.ha...@visualdna.com wrote:
 
   Just wanted to check if somebody has seen similar behaviour or knows
  what
   we might be doing wrong. We have a relatively complex spark application
   which processes half a terabyte of data at various stages. We have
  profiled
   it in several ways and everything seems to point to one place where
  90% of
   the time is spent:  AppendOnlyMap.changeValue. The job scales and is
   relatively faster than its map-reduce alternative but it still