Problems with spark.locality.wait

2014-11-13 Thread MaChong
Hi,

We are running a time sensitive application with 70 partition and 800MB each 
parition size. The application first load data from database in different 
cluster, then apply a filter, cache the filted data, then apply a map and a 
reduce, finally collect results.
The application will be finished in 20 seconds if we set spark.locality.wait to 
a large value (30 minutes). And it will use 100 seconds, if we set 
spark.locality.wait a small value(less than 10 seconds) 
We have analysed the driver log and found lot of NODE_LOCAL and RACK_LOCAL 
level tasks, normally a PROCESS_LOCAL task only takes 15 seconds, but 
NODE_LOCAL or RACK_LOCAL tasks will take 70 seconds.

So I think we'd better set spark.locality.wait to a large value(30 minutes), 
until we meet this problem: 

Now our application will load data from hdfs in the same spark cluster, it will 
get NODE_LOCAL and RACK_LOCAL level tasks during loading stage, if the tasks in 
loading stage have same locality level, ether NODE_LOCAL or RACK_LOCAL it works 
fine. 
But if the tasks in loading stage get mixed locality level, such as 3 
NODE_LOCAL tasks, and 2 RACK_LOCAL tasks, then the TaskSetManager of loading 
stage will submit the 3 NODE_LOCAL tasks as soon as resources were offered, 
then wait for spark.locality.wait.node, which was setted to 30 minutes, the 2 
RACK_LOCAL tasks will wait 30 minutes even though resources are avaliable.


Does any one have met this problem? Do you have a nice solution?


Thanks




Ma chong


Re: [VOTE] Release Apache Spark 1.1.1 (RC1)

2014-11-13 Thread Sean Owen
LICENSE and NOTICE are fine. Signature and checksum is fine. I
unzipped and built the plain source distribution, which built.

However I am seeing a consistent test failure with mvn -DskipTests
clean package; mvn test. In the Hive module:

- SET commands semantics for a HiveContext *** FAILED ***
  Expected Array(spark.sql.key.usedfortestonly=test.val.0,
spark.sql.key.usedfortestonlyspark.sql.key.usedfortestonly=test.val.0test.val.0),
but got 
Array(spark.sql.key.usedfortestonlyspark.sql.key.usedfortestonly=test.val.0test.val.0,
spark.sql.key.usedfortestonly=test.val.0) (HiveQuerySuite.scala:544)

Anyone else seeing this?


On Thu, Nov 13, 2014 at 8:18 AM, Krishna Sankar ksanka...@gmail.com wrote:
 +1
 1. Compiled OSX 10.10 (Yosemite) mvn -Pyarn -Phadoop-2.4
 -Dhadoop.version=2.4.0 -DskipTests clean package 10:49 min
 2. Tested pyspark, mlib
 2.1. statistics OK
 2.2. Linear/Ridge/Laso Regression OK
 2.3. Decision Tree, Naive Bayes OK
 2.4. KMeans OK
 2.5. rdd operations OK
 2.6. recommendation OK
 2.7. Good work ! In 1.1.0, there was an error and my program used to hang
 (over memory allocation) consistently running validation using itertools,
 compute optimum rank, lambda,numofiterations/rmse; data - movielens medium
 dataset (1 million records) . It works well in 1.1.1 !
 Cheers
 k/
 P.S: Missed Reply all, first time

 On Wed, Nov 12, 2014 at 8:35 PM, Andrew Or and...@databricks.com wrote:

 I will start the vote with a +1

 2014-11-12 20:34 GMT-08:00 Andrew Or and...@databricks.com:

  Please vote on releasing the following candidate as Apache Spark version
 1
  .1.1.
 
  This release fixes a number of bugs in Spark 1.1.0. Some of the notable
  ones are
  - [SPARK-3426] Sort-based shuffle compression settings are incompatible
  - [SPARK-3948] Stream corruption issues in sort-based shuffle
  - [SPARK-4107] Incorrect handling of Channel.read() led to data
 truncation
  The full list is at http://s.apache.org/z9h and in the CHANGES.txt
  attached.
 
  The tag to be voted on is v1.1.1-rc1 (commit 72a4fdbe):
  http://s.apache.org/cZC
 
  The release files, including signatures, digests, etc can be found at:
  http://people.apache.org/~andrewor14/spark-1.1.1-rc1/
 
  Release artifacts are signed with the following key:
  https://people.apache.org/keys/committer/andrewor14.asc
 
  The staging repository for this release can be found at:
  https://repository.apache.org/content/repositories/orgapachespark-1034/
 
  The documentation corresponding to this release can be found at:
  http://people.apache.org/~andrewor14/spark-1.1.1-rc1-docs/
 
  Please vote on releasing this package as Apache Spark 1.1.1!
 
  The vote is open until Sunday, November 16, at 04:30 UTC and passes if
  a majority of at least 3 +1 PMC votes are cast.
  [ ] +1 Release this package as Apache Spark 1.1.1
  [ ] -1 Do not release this package because ...
 
  To learn more about Apache Spark, please see
  http://spark.apache.org/
 
  Cheers,
  Andrew
 


-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Join operator in PySpark

2014-11-13 Thread 夏俊鸾
Hi all

I have noticed that “Join” operator has been transferred to union and
groupByKey operator instead of cogroup operator in PySpark, this change
will probably generate more shuffle stage, for example

rdd1 = sc.makeRDD(...).partitionBy(2)
rdd2 = sc.makeRDD(...).partitionBy(2)
rdd3 = rdd1.join(rdd2).collect()

Above code implemented with scala will generate 2 shuffle, but will
generate 3 shuffle with python. what is initial design motivation of join
operator in PySpark? Any idea to improve join performance in PySpark?

Andrew


Re: Cache sparkSql data without uncompressing it in memory

2014-11-13 Thread Sadhan Sood
Thanks Chneg, Just one more question - does that mean that we still need
enough memory in the cluster to uncompress the data before it can be
compressed again or does that just read the raw data as is?

On Wed, Nov 12, 2014 at 10:05 PM, Cheng Lian lian.cs@gmail.com wrote:

  Currently there’s no way to cache the compressed sequence file directly.
 Spark SQL uses in-memory columnar format while caching table rows, so we
 must read all the raw data and convert them into columnar format. However,
 you can enable in-memory columnar compression by setting
 spark.sql.inMemoryColumnarStorage.compressed to true. This property is
 already set to true by default in master branch and branch-1.2.

 On 11/13/14 7:16 AM, Sadhan Sood wrote:

   We noticed while caching data from our hive tables which contain data
 in compressed sequence file format that it gets uncompressed in memory when
 getting cached. Is there a way to turn this off and cache the compressed
 data as is ?

   ​



Re: [NOTICE] [BUILD] Minor changes to Spark's build

2014-11-13 Thread Marcelo Vanzin
Hello there,

So I just took a quick look at the pom and I see two problems with it.

- activatedByDefault does not work like you think it does. It only
activates by default if you do not explicitly activate other
profiles. So if you do mvn package, scala-2.10 will be activated;
but if you do mvn -Pyarn package, it will not.

- you need to duplicate the activation stuff everywhere where the
profile is declared, not just in the root pom. (I spent quite some
time yesterday fighting a similar issue...)

My suggestion here is to change the activation of scala-2.10 to look like this:

activation
  property
name!scala-2.11/name
  /property
/activation

And change the scala-2.11 profile to do this:

properties
  scala-2.11true/scala-2.11
/properties

I haven't tested, but in my experience this will activate the
scala-2.10 profile by default, unless you explicitly activate the 2.11
profile, in which case that property will be set and scala-2.10 will
not activate. If you look at examples/pom.xml, that's the same
strategy used to choose which hbase profile to activate.

Ah, and just to reinforce, the activation logic needs to be copied to
other places (e.g. examples/pom.xml, repl/pom.xml, and any other place
that has scala-2.x profiles).



On Wed, Nov 12, 2014 at 11:14 PM, Patrick Wendell pwend...@gmail.com wrote:
 I actually do agree with this - let's see if we can find a solution
 that doesn't regress this behavior. Maybe we can simply move the one
 kafka example into its own project instead of having it in the
 examples project.

 On Wed, Nov 12, 2014 at 11:07 PM, Sandy Ryza sandy.r...@cloudera.com wrote:
 Currently there are no mandatory profiles required to build Spark.  I.e.
 mvn package just works.  It seems sad that we would need to break this.

 On Wed, Nov 12, 2014 at 10:59 PM, Patrick Wendell pwend...@gmail.com
 wrote:

 I think printing an error that says -Pscala-2.10 must be enabled is
 probably okay. It's a slight regression but it's super obvious to
 users. That could be a more elegant solution than the somewhat
 complicated monstrosity I proposed on the JIRA.

 On Wed, Nov 12, 2014 at 10:37 PM, Prashant Sharma scrapco...@gmail.com
 wrote:
  One thing we can do it is print a helpful error and break. I don't know
  about how this can be done, but since now I can write groovy inside
  maven
  build so we have more control. (Yay!!)
 
  Prashant Sharma
 
 
 
  On Thu, Nov 13, 2014 at 12:05 PM, Patrick Wendell pwend...@gmail.com
  wrote:
 
  Yeah Sandy and I were chatting about this today and din't realize
  -Pscala-2.10 was mandatory. This is a fairly invasive change, so I was
  thinking maybe we could try to remove that. Also if someone doesn't
  give -Pscala-2.10 it fails in a way that is initially silent, which is
  bad because most people won't know to do this.
 
  https://issues.apache.org/jira/browse/SPARK-4375
 
  On Wed, Nov 12, 2014 at 10:29 PM, Prashant Sharma
  scrapco...@gmail.com
  wrote:
   Thanks Patrick, I have one suggestion that we should make passing
   -Pscala-2.10 mandatory for maven users. I am sorry for not mentioning
   this
   before. There is no way around not passing that option for maven
   users(only). However, this is unnecessary for sbt users because it is
   added
   automatically if -Pscala-2.11 is absent.
  
  
   Prashant Sharma
  
  
  
   On Wed, Nov 12, 2014 at 3:53 PM, Sean Owen so...@cloudera.com
   wrote:
  
   - Tip: when you rebase, IntelliJ will temporarily think things like
   the
   Kafka module are being removed. Say 'no' when it asks if you want to
   remove
   them.
   - Can we go straight to Scala 2.11.4?
  
   On Wed, Nov 12, 2014 at 5:47 AM, Patrick Wendell
   pwend...@gmail.com
   wrote:
  
Hey All,
   
I've just merged a patch that adds support for Scala 2.11 which
will
have some minor implications for the build. These are due to the
complexities of supporting two versions of Scala in a single
project.
   
1. The JDBC server will now require a special flag to build
-Phive-thriftserver on top of the existing flag -Phive. This is
because some build permutations (only in Scala 2.11) won't support
the
JDBC server yet due to transitive dependency conflicts.
   
2. The build now uses non-standard source layouts in a few
additional
places (we already did this for the Hive project) - the repl and
the
examples modules. This is just fine for maven/sbt, but it may
affect
users who import the build in IDE's that are using these projects
and
want to build Spark from the IDE. I'm going to update our wiki to
include full instructions for making this work well in IntelliJ.
   
If there are any other build related issues please respond to this
thread and we'll make sure they get sorted out. Thanks to Prashant
Sharma who is the author of this feature!
   
- Patrick
   
   
-
To unsubscribe, 

Re: [MLlib] Contributing Algorithm for Outlier Detection

2014-11-13 Thread Ashutosh
Hi Anant,

Please see the changes.

https://github.com/codeAshu/Outlier-Detection-with-AVF-Spark/blob/master/OutlierWithAVFModel.scala


I have changed the input format to Vector of String. I think we can also make 
it generic.


Line 59  72 : that counter will not affect in parallelism, Since it only work 
on one datapoint. It  only does the Indexing of the 
column.


Rest all side effects have been removed.

​

Thanks,

Ashutosh





From: slcclimber [via Apache Spark Developers List] 
ml-node+s1001551n9287...@n3.nabble.com
Sent: Tuesday, November 11, 2014 11:46 PM
To: Ashutosh Trivedi (MT2013030)
Subject: Re: [MLlib] Contributing Algorithm for Outlier Detection


Mayur,
Libsvm format sounds good to me. I could work on writing the tests if that 
helps you?
Anant

On Nov 11, 2014 11:06 AM, Ashutosh [via Apache Spark Developers List] 
[hidden email]/user/SendEmail.jtp?type=nodenode=9287i=0 wrote:

Hi Mayur,

Vector data types are implemented using breeze library, it is presented at

.../org/apache/spark/mllib/linalg


Anant,

One restriction I found that a vector can only be of 'Double', so it actually 
restrict the user.

What are you thoughts on LibSVM format?

Thanks for the comments, I was just trying to get away from those increment 
/decrement functions, they look ugly. Points are noted. I'll try to fix them 
soon. Tests are also required for the code.


Regards,

Ashutosh



From: Mayur Rustagi [via Apache Spark Developers List] ml-node+[hidden 
email]http://user/SendEmail.jtp?type=nodenode=9286i=0
Sent: Saturday, November 8, 2014 12:52 PM
To: Ashutosh Trivedi (MT2013030)
Subject: Re: [MLlib] Contributing Algorithm for Outlier Detection


 We should take a vector instead giving the user flexibility to decide
 data source/ type

What do you mean by vector datatype exactly?

Mayur Rustagi
Ph: a href=tel:%2B1%20%28760%29%20203%203257 value=+17602033257 
target=_blank+1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi


On Wed, Nov 5, 2014 at 6:45 AM, slcclimber [hidden 
email]http://user/SendEmail.jtp?type=nodenode=9239i=0 wrote:

 Ashutosh,
 I still see a few issues.
 1. On line 112 you are counting using a counter. Since this will happen in
 a RDD the counter will cause issues. Also that is not good functional style
 to use a filter function with a side effect.
 You could use randomSplit instead. This does not the same thing without the
 side effect.
 2. Similar shared usage of j in line 102 is going to be an issue as well.
 also hash seed does not need to be sequential it could be randomly
 generated or hashed on the values.
 3. The compute function and trim scores still runs on a comma separeated
 RDD. We should take a vector instead giving the user flexibility to decide
 data source/ type. what if we want data from hive tables or parquet or JSON
 or avro formats. This is a very restrictive format. With vectors the user
 has the choice of taking in whatever data format and converting them to
 vectors insteda of reading json files creating a csv file and then workig
 on that.
 4. Similar use of counters in 54 and 65 is an issue.
 Basically the shared state counters is a huge issue that does not scale.
 Since the processing of RDD's is distributed and the value j lives on the
 master.

 Anant



 On Tue, Nov 4, 2014 at 7:22 AM, Ashutosh [via Apache Spark Developers List]
 [hidden email]http://user/SendEmail.jtp?type=nodenode=9239i=1 wrote:

   Anant,
 
  I got rid of those increment/ decrements functions and now code is much
  cleaner. Please check. All your comments have been looked after.
 
 
 
 https://github.com/codeAshu/Outlier-Detection-with-AVF-Spark/blob/master/OutlierWithAVFModel.scala
 
 
   _Ashu
 
  
 https://github.com/codeAshu/Outlier-Detection-with-AVF-Spark/blob/master/OutlierWithAVFModel.scala
 
Outlier-Detection-with-AVF-Spark/OutlierWithAVFModel.scala at master ·
  codeAshu/Outlier-Detection-with-AVF-Spark · GitHub
   Contribute to Outlier-Detection-with-AVF-Spark development by creating
 an
  account on GitHub.
   Read more...
  
 https://github.com/codeAshu/Outlier-Detection-with-AVF-Spark/blob/master/OutlierWithAVFModel.scala
 
 
   --
  *From:* slcclimber [via Apache Spark Developers List] ml-node+[hidden
  email] http://user/SendEmail.jtp?type=nodenode=9083i=0
  *Sent:* Friday, October 31, 2014 10:09 AM
  *To:* Ashutosh Trivedi (MT2013030)
  *Subject:* Re: [MLlib] Contributing Algorithm for Outlier Detection
 
 
  You should create a jira ticket to go with it as well.
  Thanks
  On Oct 30, 2014 10:38 PM, Ashutosh [via Apache Spark Developers List]
 [hidden
  email] http://user/SendEmail.jtp?type=nodenode=9037i=0 wrote:
 
   ​Okay. I'll try it and post it soon with test case. After that I think
  we can go ahead with the PR.
   --
  *From:* slcclimber [via Apache Spark Developers List] 

Re: [NOTICE] [BUILD] Minor changes to Spark's build

2014-11-13 Thread Marcelo Vanzin
Hey Patrick,

On Thu, Nov 13, 2014 at 10:49 AM, Patrick Wendell pwend...@gmail.com wrote:
 I'm not sure chaining activation works like that. At least in my
 experience activation based on properties only works for properties
 explicitly specified at the command line rather than declared
 elsewhere in the pom.

That's true, but note the code I posted activates a profile based on
the lack of a property being set, which is why it works. Granted, I
did not test that if you activate the other profile, the one with the
property check will be disabled.

 I any case, I think Prashant just didn't document that his patch
 required -Pscala-2.10 explicitly, which is what he said further up in
 the thread. And Sandy has a solution that has better behavior than
 that, which is nice.

Yeah, I saw Sandy's patch now and it's probably a better solution
(since it doesn't abuse the sort of tricky maven profile stuff as
much).

-- 
Marcelo

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: [NOTICE] [BUILD] Minor changes to Spark's build

2014-11-13 Thread Marcelo Vanzin
On Thu, Nov 13, 2014 at 10:58 AM, Patrick Wendell pwend...@gmail.com wrote:
 That's true, but note the code I posted activates a profile based on
 the lack of a property being set, which is why it works. Granted, I
 did not test that if you activate the other profile, the one with the
 property check will be disabled.

 Ah yeah good call - I so then we'd trigger 2.11-vs-not based on the
 presence of -Dscala-2.11.

 Would that fix this issue then? It might be a simpler fix to merge
 into the 1.2 branch than Sandy's patch since we're pretty late in the
 game (though that patch does other things separately that I'd like to
 see end up in Spark soon).

Yeah, that's the idea. As for simplicity, I think Sandy's patch would
be just as simple if it avoided all the changes to isolate the
examples / external stuff into different profiles.

-- 
Marcelo

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: Problems with spark.locality.wait

2014-11-13 Thread Kay Ousterhout
Hi,

Shivaram and I stumbled across this problem a few weeks ago, and AFAIK
there is no nice solution.  We worked around it by avoiding jobs with tasks
that have tasks with two locality levels.

To fix this problem, we really need to fix the underlying problem in the
scheduling code, which currently tries to schedule all tasks at the minimum
locality of any of the tasks in the job.  Unfortunately, this involves
adding a bunch of complexity to the scheduling code.

Patrick had previously convinced us that we were the only ones running into
this problem, so it wasn't worth fixing (especially because we found an OK
workaround for our case).  It would be useful to hear if any other folks
have run into this problem -- it sounds like maybe we should go ahead and
fix the scheduling code.

I've filed a JIRA to track this:
https://issues.apache.org/jira/browse/SPARK-4383

-Kay

On Wed, Nov 12, 2014 at 11:55 PM, MaChong machon...@sina.com wrote:

 Hi,

 We are running a time sensitive application with 70 partition and 800MB
 each parition size. The application first load data from database in
 different cluster, then apply a filter, cache the filted data, then apply a
 map and a reduce, finally collect results.
 The application will be finished in 20 seconds if we set
 spark.locality.wait to a large value (30 minutes). And it will use 100
 seconds, if we set spark.locality.wait a small value(less than 10 seconds)
 We have analysed the driver log and found lot of NODE_LOCAL and RACK_LOCAL
 level tasks, normally a PROCESS_LOCAL task only takes 15 seconds, but
 NODE_LOCAL or RACK_LOCAL tasks will take 70 seconds.

 So I think we'd better set spark.locality.wait to a large value(30
 minutes), until we meet this problem:

 Now our application will load data from hdfs in the same spark cluster, it
 will get NODE_LOCAL and RACK_LOCAL level tasks during loading stage, if the
 tasks in loading stage have same locality level, ether NODE_LOCAL or
 RACK_LOCAL it works fine.
 But if the tasks in loading stage get mixed locality level, such as 3
 NODE_LOCAL tasks, and 2 RACK_LOCAL tasks, then the TaskSetManager of
 loading stage will submit the 3 NODE_LOCAL tasks as soon as resources were
 offered, then wait for spark.locality.wait.node, which was setted to 30
 minutes, the 2 RACK_LOCAL tasks will wait 30 minutes even though resources
 are avaliable.


 Does any one have met this problem? Do you have a nice solution?


 Thanks




 Ma chong



Re: [VOTE] Release Apache Spark 1.1.1 (RC1)

2014-11-13 Thread Michael Armbrust
Hey Sean,

Thanks for pointing this out.  Looks like a bad test where we should be
doing Set comparison instead of Array.

Michael

On Thu, Nov 13, 2014 at 2:05 AM, Sean Owen so...@cloudera.com wrote:

 LICENSE and NOTICE are fine. Signature and checksum is fine. I
 unzipped and built the plain source distribution, which built.

 However I am seeing a consistent test failure with mvn -DskipTests
 clean package; mvn test. In the Hive module:

 - SET commands semantics for a HiveContext *** FAILED ***
   Expected Array(spark.sql.key.usedfortestonly=test.val.0,

 spark.sql.key.usedfortestonlyspark.sql.key.usedfortestonly=test.val.0test.val.0),
 but got
 Array(spark.sql.key.usedfortestonlyspark.sql.key.usedfortestonly=test.val.0test.val.0,
 spark.sql.key.usedfortestonly=test.val.0) (HiveQuerySuite.scala:544)

 Anyone else seeing this?


 On Thu, Nov 13, 2014 at 8:18 AM, Krishna Sankar ksanka...@gmail.com
 wrote:
  +1
  1. Compiled OSX 10.10 (Yosemite) mvn -Pyarn -Phadoop-2.4
  -Dhadoop.version=2.4.0 -DskipTests clean package 10:49 min
  2. Tested pyspark, mlib
  2.1. statistics OK
  2.2. Linear/Ridge/Laso Regression OK
  2.3. Decision Tree, Naive Bayes OK
  2.4. KMeans OK
  2.5. rdd operations OK
  2.6. recommendation OK
  2.7. Good work ! In 1.1.0, there was an error and my program used to hang
  (over memory allocation) consistently running validation using itertools,
  compute optimum rank, lambda,numofiterations/rmse; data - movielens
 medium
  dataset (1 million records) . It works well in 1.1.1 !
  Cheers
  k/
  P.S: Missed Reply all, first time
 
  On Wed, Nov 12, 2014 at 8:35 PM, Andrew Or and...@databricks.com
 wrote:
 
  I will start the vote with a +1
 
  2014-11-12 20:34 GMT-08:00 Andrew Or and...@databricks.com:
 
   Please vote on releasing the following candidate as Apache Spark
 version
  1
   .1.1.
  
   This release fixes a number of bugs in Spark 1.1.0. Some of the
 notable
   ones are
   - [SPARK-3426] Sort-based shuffle compression settings are
 incompatible
   - [SPARK-3948] Stream corruption issues in sort-based shuffle
   - [SPARK-4107] Incorrect handling of Channel.read() led to data
  truncation
   The full list is at http://s.apache.org/z9h and in the CHANGES.txt
   attached.
  
   The tag to be voted on is v1.1.1-rc1 (commit 72a4fdbe):
   http://s.apache.org/cZC
  
   The release files, including signatures, digests, etc can be found at:
   http://people.apache.org/~andrewor14/spark-1.1.1-rc1/
  
   Release artifacts are signed with the following key:
   https://people.apache.org/keys/committer/andrewor14.asc
  
   The staging repository for this release can be found at:
  
 https://repository.apache.org/content/repositories/orgapachespark-1034/
  
   The documentation corresponding to this release can be found at:
   http://people.apache.org/~andrewor14/spark-1.1.1-rc1-docs/
  
   Please vote on releasing this package as Apache Spark 1.1.1!
  
   The vote is open until Sunday, November 16, at 04:30 UTC and passes if
   a majority of at least 3 +1 PMC votes are cast.
   [ ] +1 Release this package as Apache Spark 1.1.1
   [ ] -1 Do not release this package because ...
  
   To learn more about Apache Spark, please see
   http://spark.apache.org/
  
   Cheers,
   Andrew
  
 

 -
 To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
 For additional commands, e-mail: dev-h...@spark.apache.org




Re: About implicit rddToPairRDDFunctions

2014-11-13 Thread Reynold Xin
Do people usually important o.a.spark.rdd._ ?

Also in order to maintain source and binary compatibility, we would need to
keep both right?


On Thu, Nov 6, 2014 at 3:12 AM, Shixiong Zhu zsxw...@gmail.com wrote:

 I saw many people asked how to convert a RDD to a PairRDDFunctions. I would
 like to ask a question about it. Why not put the following implicit into
 pacakge object rdd or object rdd?

   implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
   (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null)
 = {
 new PairRDDFunctions(rdd)
   }

 If so, the converting will be automatic and not need to
 import org.apache.spark.SparkContext._

 I tried to search some discussion but found nothing.

 Best Regards,
 Shixiong Zhu



Re: [VOTE] Release Apache Spark 1.1.1 (RC1)

2014-11-13 Thread Andrew Or
Yeah, this seems to be somewhat environment specific too. The same test has
been passing here for a while:
https://amplab.cs.berkeley.edu/jenkins/job/Spark-1.1-Maven-pre-YARN/hadoop.version=1.0.4,label=centos/lastBuild/consoleFull

2014-11-13 11:26 GMT-08:00 Michael Armbrust mich...@databricks.com:

 Hey Sean,

 Thanks for pointing this out.  Looks like a bad test where we should be
 doing Set comparison instead of Array.

 Michael

 On Thu, Nov 13, 2014 at 2:05 AM, Sean Owen so...@cloudera.com wrote:

 LICENSE and NOTICE are fine. Signature and checksum is fine. I
 unzipped and built the plain source distribution, which built.

 However I am seeing a consistent test failure with mvn -DskipTests
 clean package; mvn test. In the Hive module:

 - SET commands semantics for a HiveContext *** FAILED ***
   Expected Array(spark.sql.key.usedfortestonly=test.val.0,

 spark.sql.key.usedfortestonlyspark.sql.key.usedfortestonly=test.val.0test.val.0),
 but got
 Array(spark.sql.key.usedfortestonlyspark.sql.key.usedfortestonly=test.val.0test.val.0,
 spark.sql.key.usedfortestonly=test.val.0) (HiveQuerySuite.scala:544)

 Anyone else seeing this?


 On Thu, Nov 13, 2014 at 8:18 AM, Krishna Sankar ksanka...@gmail.com
 wrote:
  +1
  1. Compiled OSX 10.10 (Yosemite) mvn -Pyarn -Phadoop-2.4
  -Dhadoop.version=2.4.0 -DskipTests clean package 10:49 min
  2. Tested pyspark, mlib
  2.1. statistics OK
  2.2. Linear/Ridge/Laso Regression OK
  2.3. Decision Tree, Naive Bayes OK
  2.4. KMeans OK
  2.5. rdd operations OK
  2.6. recommendation OK
  2.7. Good work ! In 1.1.0, there was an error and my program used to
 hang
  (over memory allocation) consistently running validation using
 itertools,
  compute optimum rank, lambda,numofiterations/rmse; data - movielens
 medium
  dataset (1 million records) . It works well in 1.1.1 !
  Cheers
  k/
  P.S: Missed Reply all, first time
 
  On Wed, Nov 12, 2014 at 8:35 PM, Andrew Or and...@databricks.com
 wrote:
 
  I will start the vote with a +1
 
  2014-11-12 20:34 GMT-08:00 Andrew Or and...@databricks.com:
 
   Please vote on releasing the following candidate as Apache Spark
 version
  1
   .1.1.
  
   This release fixes a number of bugs in Spark 1.1.0. Some of the
 notable
   ones are
   - [SPARK-3426] Sort-based shuffle compression settings are
 incompatible
   - [SPARK-3948] Stream corruption issues in sort-based shuffle
   - [SPARK-4107] Incorrect handling of Channel.read() led to data
  truncation
   The full list is at http://s.apache.org/z9h and in the CHANGES.txt
   attached.
  
   The tag to be voted on is v1.1.1-rc1 (commit 72a4fdbe):
   http://s.apache.org/cZC
  
   The release files, including signatures, digests, etc can be found
 at:
   http://people.apache.org/~andrewor14/spark-1.1.1-rc1/
  
   Release artifacts are signed with the following key:
   https://people.apache.org/keys/committer/andrewor14.asc
  
   The staging repository for this release can be found at:
  
 https://repository.apache.org/content/repositories/orgapachespark-1034/
  
   The documentation corresponding to this release can be found at:
   http://people.apache.org/~andrewor14/spark-1.1.1-rc1-docs/
  
   Please vote on releasing this package as Apache Spark 1.1.1!
  
   The vote is open until Sunday, November 16, at 04:30 UTC and passes
 if
   a majority of at least 3 +1 PMC votes are cast.
   [ ] +1 Release this package as Apache Spark 1.1.1
   [ ] -1 Do not release this package because ...
  
   To learn more about Apache Spark, please see
   http://spark.apache.org/
  
   Cheers,
   Andrew
  
 

 -
 To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
 For additional commands, e-mail: dev-h...@spark.apache.org





Re: Join operator in PySpark

2014-11-13 Thread Josh Rosen
We should implement this using cogroup(); it will just require some tracking to 
map Python partitioners into dummy Java ones so that Java Spark’s cogroup() 
operator respects Python’s partitioning.  I’m sure that there are some other 
subtleties, particularly if we mix datasets that use different serializers / 
Java object representations.

There’s a longstanding JIRA to fix this: 
https://issues.apache.org/jira/browse/SPARK-655

On November 13, 2014 at 5:08:15 AM, 夏俊鸾 (xiajunl...@gmail.com) wrote:

Hi all  

I have noticed that “Join” operator has been transferred to union and  
groupByKey operator instead of cogroup operator in PySpark, this change  
will probably generate more shuffle stage, for example  

rdd1 = sc.makeRDD(...).partitionBy(2)  
rdd2 = sc.makeRDD(...).partitionBy(2)  
rdd3 = rdd1.join(rdd2).collect()  

Above code implemented with scala will generate 2 shuffle, but will  
generate 3 shuffle with python. what is initial design motivation of join  
operator in PySpark? Any idea to improve join performance in PySpark?  

Andrew  


Re: [VOTE] Release Apache Spark 1.1.1 (RC1)

2014-11-13 Thread Sean Owen
Ah right. This is because I'm running Java 8. This was fixed in
SPARK-3329 
(https://github.com/apache/spark/commit/2b7ab814f9bde65ebc57ebd04386e56c97f06f4a#diff-7bfd8d7c8cbb02aa0023e4c3497ee832).
Consider back-porting it if other reasons arise, but this is specific
to tests and to Java 8.

On Thu, Nov 13, 2014 at 8:01 PM, Andrew Or and...@databricks.com wrote:
 Yeah, this seems to be somewhat environment specific too. The same test has
 been passing here for a while:
 https://amplab.cs.berkeley.edu/jenkins/job/Spark-1.1-Maven-pre-YARN/hadoop.version=1.0.4,label=centos/lastBuild/consoleFull

 2014-11-13 11:26 GMT-08:00 Michael Armbrust mich...@databricks.com:

 Hey Sean,

 Thanks for pointing this out.  Looks like a bad test where we should be
 doing Set comparison instead of Array.

 Michael

 On Thu, Nov 13, 2014 at 2:05 AM, Sean Owen so...@cloudera.com wrote:

 LICENSE and NOTICE are fine. Signature and checksum is fine. I
 unzipped and built the plain source distribution, which built.

 However I am seeing a consistent test failure with mvn -DskipTests
 clean package; mvn test. In the Hive module:

 - SET commands semantics for a HiveContext *** FAILED ***
   Expected Array(spark.sql.key.usedfortestonly=test.val.0,

 spark.sql.key.usedfortestonlyspark.sql.key.usedfortestonly=test.val.0test.val.0),
 but got
 Array(spark.sql.key.usedfortestonlyspark.sql.key.usedfortestonly=test.val.0test.val.0,
 spark.sql.key.usedfortestonly=test.val.0) (HiveQuerySuite.scala:544)

 Anyone else seeing this?


 On Thu, Nov 13, 2014 at 8:18 AM, Krishna Sankar ksanka...@gmail.com
 wrote:
  +1
  1. Compiled OSX 10.10 (Yosemite) mvn -Pyarn -Phadoop-2.4
  -Dhadoop.version=2.4.0 -DskipTests clean package 10:49 min
  2. Tested pyspark, mlib
  2.1. statistics OK
  2.2. Linear/Ridge/Laso Regression OK
  2.3. Decision Tree, Naive Bayes OK
  2.4. KMeans OK
  2.5. rdd operations OK
  2.6. recommendation OK
  2.7. Good work ! In 1.1.0, there was an error and my program used to
  hang
  (over memory allocation) consistently running validation using
  itertools,
  compute optimum rank, lambda,numofiterations/rmse; data - movielens
  medium
  dataset (1 million records) . It works well in 1.1.1 !
  Cheers
  k/
  P.S: Missed Reply all, first time
 
  On Wed, Nov 12, 2014 at 8:35 PM, Andrew Or and...@databricks.com
  wrote:
 
  I will start the vote with a +1
 
  2014-11-12 20:34 GMT-08:00 Andrew Or and...@databricks.com:
 
   Please vote on releasing the following candidate as Apache Spark
   version
  1
   .1.1.
  
   This release fixes a number of bugs in Spark 1.1.0. Some of the
   notable
   ones are
   - [SPARK-3426] Sort-based shuffle compression settings are
   incompatible
   - [SPARK-3948] Stream corruption issues in sort-based shuffle
   - [SPARK-4107] Incorrect handling of Channel.read() led to data
  truncation
   The full list is at http://s.apache.org/z9h and in the CHANGES.txt
   attached.
  
   The tag to be voted on is v1.1.1-rc1 (commit 72a4fdbe):
   http://s.apache.org/cZC
  
   The release files, including signatures, digests, etc can be found
   at:
   http://people.apache.org/~andrewor14/spark-1.1.1-rc1/
  
   Release artifacts are signed with the following key:
   https://people.apache.org/keys/committer/andrewor14.asc
  
   The staging repository for this release can be found at:
  
   https://repository.apache.org/content/repositories/orgapachespark-1034/
  
   The documentation corresponding to this release can be found at:
   http://people.apache.org/~andrewor14/spark-1.1.1-rc1-docs/
  
   Please vote on releasing this package as Apache Spark 1.1.1!
  
   The vote is open until Sunday, November 16, at 04:30 UTC and passes
   if
   a majority of at least 3 +1 PMC votes are cast.
   [ ] +1 Release this package as Apache Spark 1.1.1
   [ ] -1 Do not release this package because ...
  
   To learn more about Apache Spark, please see
   http://spark.apache.org/
  
   Cheers,
   Andrew
  
 

 -
 To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
 For additional commands, e-mail: dev-h...@spark.apache.org




-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: Problems with spark.locality.wait

2014-11-13 Thread Kay Ousterhout
Hi Mridul,

In the case Shivaram and I saw, and based on my understanding of Ma chong's
description, I don't think that completely fixes the problem.

To be very concrete, suppose your job has two tasks, t1 and t2, and they
each have input data (in HDFS) on h1 and h2, respectively, and that h1 and
h2 are on the same rack. Suppose your Spark job gets allocated two
executors, one on h1 and another on h3 (a different host with no input
data).  When the job gets submitted to the task set manager (TSM),
TSM.computeValidLocalityLevels will determine that the valid levels are
NODE_LOCAL (because t1 could be run on the NODE_LOCAL executor on h1),
RACK_LOCAL, ANY.  As a result, the TSM will not schedule t2 until
spark.locality.wait.NODE_LOCAL expires, even though t2 has no hope of being
scheduled on a NODE_LOCAL machine (because the job wasn't given any
executors on h2).  You could set spark.locality.wait.NODE_LOCAL to be low,
but then it might cause t1 (or more generally, in a larger job, other tasks
that have NODE_LOCAL executors where they can be scheduled) to get
scheduled on h3 (and not on h1).

Is there a way you were thinking of configuring things that avoids this
problem?

I'm pretty sure we could fix this problem by tracking more information
about each task in the TSM -- for example, the TSM has enough information
to know that there are no NODE_LOCAL executors where t2 could be scheduled
in the above example (and that the best possible locality level for t2 is
RACK_LOCAL), and could schedule t2 right away on a RACK_LOCAL machine.  Of
course, this would add a bunch of complexity to the TSM, hence the earlier
decision that the added complexity may not be worth it.

-Kay

On Thu, Nov 13, 2014 at 12:11 PM, Mridul Muralidharan mri...@gmail.com
wrote:

 Instead of setting spark.locality.wait, try setting individual
 locality waits specifically.

 Namely, spark.locality.wait.PROCESS_LOCAL to high value (so that
 process local tasks are always scheduled in case the task set has
 process local tasks).
 Set spark.locality.wait.NODE_LOCAL and spark.locality.wait.RACK_LOCAL
 to low value - so that in case task set has no process local tasks,
 both node local and rack local tasks are scheduled asap.

 From your description, this will alleviate the problem you mentioned.


 Kay's comment, IMO, is slightly general in nature - and I suspect
 unless we overhaul how preferred locality is specified, and allow for
 taskset specific hints for schedule, we cant resolve that IMO.


 Regards,
 Mridul



 On Thu, Nov 13, 2014 at 1:25 PM, MaChong machon...@sina.com wrote:
  Hi,
 
  We are running a time sensitive application with 70 partition and 800MB
 each parition size. The application first load data from database in
 different cluster, then apply a filter, cache the filted data, then apply a
 map and a reduce, finally collect results.
  The application will be finished in 20 seconds if we set
 spark.locality.wait to a large value (30 minutes). And it will use 100
 seconds, if we set spark.locality.wait a small value(less than 10 seconds)
  We have analysed the driver log and found lot of NODE_LOCAL and
 RACK_LOCAL level tasks, normally a PROCESS_LOCAL task only takes 15
 seconds, but NODE_LOCAL or RACK_LOCAL tasks will take 70 seconds.
 
  So I think we'd better set spark.locality.wait to a large value(30
 minutes), until we meet this problem:
 
  Now our application will load data from hdfs in the same spark cluster,
 it will get NODE_LOCAL and RACK_LOCAL level tasks during loading stage, if
 the tasks in loading stage have same locality level, ether NODE_LOCAL or
 RACK_LOCAL it works fine.
  But if the tasks in loading stage get mixed locality level, such as 3
 NODE_LOCAL tasks, and 2 RACK_LOCAL tasks, then the TaskSetManager of
 loading stage will submit the 3 NODE_LOCAL tasks as soon as resources were
 offered, then wait for spark.locality.wait.node, which was setted to 30
 minutes, the 2 RACK_LOCAL tasks will wait 30 minutes even though resources
 are avaliable.
 
 
  Does any one have met this problem? Do you have a nice solution?
 
 
  Thanks
 
 
 
 
  Ma chong

 -
 To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
 For additional commands, e-mail: dev-h...@spark.apache.org




TimSort in 1.2

2014-11-13 Thread Debasish Das
Hi,

I am noticing the first step for Spark jobs does a TimSort in 1.2
branch...and there is some time spent doing the TimSort...Is this assigning
the RDD blocks to different nodes based on a sort order ?

Could someone please point to a JIRA about this change so that I can read
more about it ?

Thanks.
Deb


Re: Problems with spark.locality.wait

2014-11-13 Thread Nathan Kronenfeld
This sounds like it may be exactly the problem we've been having (and about
which I recently  posted on the user list).

Is there any way of monitoring it's attempts to wait, giving up, and trying
another level?

In general, I'm trying to figure out why we can have repeated identical
jobs, the first of which will have all PROCESS_LOCAL, and the next will
have 95% PROCESS_LOCAL, and 5% ANY.


On Thu, Nov 13, 2014 at 2:20 PM, Kay Ousterhout k...@eecs.berkeley.edu
wrote:

 Hi,

 Shivaram and I stumbled across this problem a few weeks ago, and AFAIK
 there is no nice solution.  We worked around it by avoiding jobs with tasks
 that have tasks with two locality levels.

 To fix this problem, we really need to fix the underlying problem in the
 scheduling code, which currently tries to schedule all tasks at the minimum
 locality of any of the tasks in the job.  Unfortunately, this involves
 adding a bunch of complexity to the scheduling code.

 Patrick had previously convinced us that we were the only ones running into
 this problem, so it wasn't worth fixing (especially because we found an OK
 workaround for our case).  It would be useful to hear if any other folks
 have run into this problem -- it sounds like maybe we should go ahead and
 fix the scheduling code.

 I've filed a JIRA to track this:
 https://issues.apache.org/jira/browse/SPARK-4383

 -Kay

 On Wed, Nov 12, 2014 at 11:55 PM, MaChong machon...@sina.com wrote:

  Hi,
 
  We are running a time sensitive application with 70 partition and 800MB
  each parition size. The application first load data from database in
  different cluster, then apply a filter, cache the filted data, then
 apply a
  map and a reduce, finally collect results.
  The application will be finished in 20 seconds if we set
  spark.locality.wait to a large value (30 minutes). And it will use 100
  seconds, if we set spark.locality.wait a small value(less than 10
 seconds)
  We have analysed the driver log and found lot of NODE_LOCAL and
 RACK_LOCAL
  level tasks, normally a PROCESS_LOCAL task only takes 15 seconds, but
  NODE_LOCAL or RACK_LOCAL tasks will take 70 seconds.
 
  So I think we'd better set spark.locality.wait to a large value(30
  minutes), until we meet this problem:
 
  Now our application will load data from hdfs in the same spark cluster,
 it
  will get NODE_LOCAL and RACK_LOCAL level tasks during loading stage, if
 the
  tasks in loading stage have same locality level, ether NODE_LOCAL or
  RACK_LOCAL it works fine.
  But if the tasks in loading stage get mixed locality level, such as 3
  NODE_LOCAL tasks, and 2 RACK_LOCAL tasks, then the TaskSetManager of
  loading stage will submit the 3 NODE_LOCAL tasks as soon as resources
 were
  offered, then wait for spark.locality.wait.node, which was setted to 30
  minutes, the 2 RACK_LOCAL tasks will wait 30 minutes even though
 resources
  are avaliable.
 
 
  Does any one have met this problem? Do you have a nice solution?
 
 
  Thanks
 
 
 
 
  Ma chong
 




-- 
Nathan Kronenfeld
Senior Visualization Developer
Oculus Info Inc
2 Berkeley Street, Suite 600,
Toronto, Ontario M5A 4J5
Phone:  +1-416-203-3003 x 238
Email:  nkronenf...@oculusinfo.com


Re: About implicit rddToPairRDDFunctions

2014-11-13 Thread Shixiong Zhu
If we put the `implicit` into pacakge object rdd or object rdd, when we
write `rdd.groupbykey()`, because rdd is an object of RDD, Scala compiler
will search `object rdd`(companion object) and `package object rdd`(pacakge
object) by default. We don't need to import them explicitly. Here is a post
about the implicit search logic:
http://eed3si9n.com/revisiting-implicits-without-import-tax

To maintain the compatibility, we can keep `rddToPairRDDFunctions` in the
SparkContext but remove `implicit`. The disadvantage is there are two
copies of same codes.




Best Regards,
Shixiong Zhu

2014-11-14 3:57 GMT+08:00 Reynold Xin r...@databricks.com:

 Do people usually important o.a.spark.rdd._ ?

 Also in order to maintain source and binary compatibility, we would need
 to keep both right?


 On Thu, Nov 6, 2014 at 3:12 AM, Shixiong Zhu zsxw...@gmail.com wrote:

 I saw many people asked how to convert a RDD to a PairRDDFunctions. I
 would
 like to ask a question about it. Why not put the following implicit into
 pacakge object rdd or object rdd?

   implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
   (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null)
 = {
 new PairRDDFunctions(rdd)
   }

 If so, the converting will be automatic and not need to
 import org.apache.spark.SparkContext._

 I tried to search some discussion but found nothing.

 Best Regards,
 Shixiong Zhu





Re: Cache sparkSql data without uncompressing it in memory

2014-11-13 Thread Cheng Lian
No, the columnar buffer is built in a small batching manner, the batch 
size is controlled by the |spark.sql.inMemoryColumnarStorage.batchSize| 
property. The default value for this in master and branch-1.2 is 10,000 
rows per batch.


On 11/14/14 1:27 AM, Sadhan Sood wrote:

Thanks Chneg, Just one more question - does that mean that we still 
need enough memory in the cluster to uncompress the data before it can 
be compressed again or does that just read the raw data as is?


On Wed, Nov 12, 2014 at 10:05 PM, Cheng Lian lian.cs@gmail.com 
mailto:lian.cs@gmail.com wrote:


Currently there’s no way to cache the compressed sequence file
directly. Spark SQL uses in-memory columnar format while caching
table rows, so we must read all the raw data and convert them into
columnar format. However, you can enable in-memory columnar
compression by setting
|spark.sql.inMemoryColumnarStorage.compressed| to |true|. This
property is already set to true by default in master branch and
branch-1.2.

On 11/13/14 7:16 AM, Sadhan Sood wrote:


We noticed while caching data from our hive tables which contain
data in compressed sequence file format that it gets uncompressed
in memory when getting cached. Is there a way to turn this off
and cache the compressed data as is ?

​



​


Re: Re: Problems with spark.locality.wait

2014-11-13 Thread MaChong
In the specific example stated, the user had two taskset if I
understood right ... the first taskset reads off db (dfs in your
example), and does some filter, etc and caches it.
Second which works off the cached data (which is, now, process local
locality level aware) to do map, group, etc.

The taskset(s) which work off the cached data would be sensitive to
PROCESS_LOCAL locality level.
But for the initial taskset (which loaded off hdfs/database, etc) no
tasks can be process local - since we do not have a way to specify
that in spark (which, imo, is a limitation).

Given this, the requirement seemed to be to relax locality level for
initial load taskset - since not scheduling on rack local or other
nodes seems to be hurting utilization and latency when no node local
executors are available.
But for tasksets which have process local tasks, user wants to ensure
that node/rack local schedule does not happen (based on the timeouts
and perf numbers).

Hence my suggestion on setting the individual locality level timeouts
- ofcourse, my suggestion was highly specific to the problem as stated
:-)
It is, by no means, a generalization - and I do agree we definitely do
need to address the larger scheduling issue.

Regards,
Mridul



On Fri, Nov 14, 2014 at 2:05 AM, Kay Ousterhout k...@eecs.berkeley.edu wrote:
 Hi Mridul,

 In the case Shivaram and I saw, and based on my understanding of Ma chong's
 description, I don't think that completely fixes the problem.

 To be very concrete, suppose your job has two tasks, t1 and t2, and they
 each have input data (in HDFS) on h1 and h2, respectively, and that h1 and
 h2 are on the same rack. Suppose your Spark job gets allocated two
 executors, one on h1 and another on h3 (a different host with no input
 data).  When the job gets submitted to the task set manager (TSM),
 TSM.computeValidLocalityLevels will determine that the valid levels are
 NODE_LOCAL (because t1 could be run on the NODE_LOCAL executor on h1),
 RACK_LOCAL, ANY.  As a result, the TSM will not schedule t2 until
 spark.locality.wait.NODE_LOCAL expires, even though t2 has no hope of being
 scheduled on a NODE_LOCAL machine (because the job wasn't given any
 executors on h2).  You could set spark.locality.wait.NODE_LOCAL to be low,
 but then it might cause t1 (or more generally, in a larger job, other tasks
 that have NODE_LOCAL executors where they can be scheduled) to get scheduled
 on h3 (and not on h1).

 Is there a way you were thinking of configuring things that avoids this
 problem?

 I'm pretty sure we could fix this problem by tracking more information about
 each task in the TSM -- for example, the TSM has enough information to know
 that there are no NODE_LOCAL executors where t2 could be scheduled in the
 above example (and that the best possible locality level for t2 is
 RACK_LOCAL), and could schedule t2 right away on a RACK_LOCAL machine.  Of
 course, this would add a bunch of complexity to the TSM, hence the earlier
 decision that the added complexity may not be worth it.

 -Kay

 On Thu, Nov 13, 2014 at 12:11 PM, Mridul Muralidharan mri...@gmail.com
 wrote:

 Instead of setting spark.locality.wait, try setting individual
 locality waits specifically.

 Namely, spark.locality.wait.PROCESS_LOCAL to high value (so that
 process local tasks are always scheduled in case the task set has
 process local tasks).
 Set spark.locality.wait.NODE_LOCAL and spark.locality.wait.RACK_LOCAL
 to low value - so that in case task set has no process local tasks,
 both node local and rack local tasks are scheduled asap.

 From your description, this will alleviate the problem you mentioned.


 Kay's comment, IMO, is slightly general in nature - and I suspect
 unless we overhaul how preferred locality is specified, and allow for
 taskset specific hints for schedule, we cant resolve that IMO.


 Regards,
 Mridul



 On Thu, Nov 13, 2014 at 1:25 PM, MaChong machon...@sina.com wrote:
  Hi,
 
  We are running a time sensitive application with 70 partition and 800MB
  each parition size. The application first load data from database in
  different cluster, then apply a filter, cache the filted data, then apply a
  map and a reduce, finally collect results.
  The application will be finished in 20 seconds if we set
  spark.locality.wait to a large value (30 minutes). And it will use 100
  seconds, if we set spark.locality.wait a small value(less than 10 seconds)
  We have analysed the driver log and found lot of NODE_LOCAL and
  RACK_LOCAL level tasks, normally a PROCESS_LOCAL task only takes 15 
  seconds,
  but NODE_LOCAL or RACK_LOCAL tasks will take 70 seconds.
 
  So I think we'd better set spark.locality.wait to a large value(30
  minutes), until we meet this problem:
 
  Now our application will load data from hdfs in the same spark cluster,
  it will get NODE_LOCAL and RACK_LOCAL level tasks during loading stage, if
  the tasks in loading stage have same locality level, ether NODE_LOCAL or
  RACK_LOCAL it works 

Re: [MLlib] Contributing Algorithm for Outlier Detection

2014-11-13 Thread Meethu Mathew

Hi Ashutosh,

Please edit the README file.I think the following function call is 
changed now.


|model = OutlierWithAVFModel.outliers(master:String, input dir:String , 
percentage:Double||)
|

Regards,

*Meethu Mathew*

*Engineer*

*Flytxt*

_http://www.linkedin.com/home?trk=hb_tab_home_top_

On Friday 14 November 2014 12:01 AM, Ashutosh wrote:

Hi Anant,

Please see the changes.

https://github.com/codeAshu/Outlier-Detection-with-AVF-Spark/blob/master/OutlierWithAVFModel.scala


I have changed the input format to Vector of String. I think we can also make 
it generic.


Line 59  72 : that counter will not affect in parallelism, Since it only work 
on one datapoint. It  only does the Indexing of the column.


Rest all side effects have been removed.

​

Thanks,

Ashutosh





From: slcclimber [via Apache Spark Developers List] 
ml-node+s1001551n9287...@n3.nabble.com
Sent: Tuesday, November 11, 2014 11:46 PM
To: Ashutosh Trivedi (MT2013030)
Subject: Re: [MLlib] Contributing Algorithm for Outlier Detection


Mayur,
Libsvm format sounds good to me. I could work on writing the tests if that 
helps you?
Anant

On Nov 11, 2014 11:06 AM, Ashutosh [via Apache Spark Developers List] [hidden 
email]/user/SendEmail.jtp?type=nodenode=9287i=0 wrote:

Hi Mayur,

Vector data types are implemented using breeze library, it is presented at

.../org/apache/spark/mllib/linalg


Anant,

One restriction I found that a vector can only be of 'Double', so it actually 
restrict the user.

What are you thoughts on LibSVM format?

Thanks for the comments, I was just trying to get away from those increment 
/decrement functions, they look ugly. Points are noted. I'll try to fix them 
soon. Tests are also required for the code.


Regards,

Ashutosh



From: Mayur Rustagi [via Apache Spark Developers List] ml-node+[hidden 
email]http://user/SendEmail.jtp?type=nodenode=9286i=0
Sent: Saturday, November 8, 2014 12:52 PM
To: Ashutosh Trivedi (MT2013030)
Subject: Re: [MLlib] Contributing Algorithm for Outlier Detection


We should take a vector instead giving the user flexibility to decide
data source/ type

What do you mean by vector datatype exactly?

Mayur Rustagi
Ph: a href=tel:%2B1%20%28760%29%20203%203257 value=+17602033257 
target=_blank+1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi


On Wed, Nov 5, 2014 at 6:45 AM, slcclimber [hidden 
email]http://user/SendEmail.jtp?type=nodenode=9239i=0 wrote:


Ashutosh,
I still see a few issues.
1. On line 112 you are counting using a counter. Since this will happen in
a RDD the counter will cause issues. Also that is not good functional style
to use a filter function with a side effect.
You could use randomSplit instead. This does not the same thing without the
side effect.
2. Similar shared usage of j in line 102 is going to be an issue as well.
also hash seed does not need to be sequential it could be randomly
generated or hashed on the values.
3. The compute function and trim scores still runs on a comma separeated
RDD. We should take a vector instead giving the user flexibility to decide
data source/ type. what if we want data from hive tables or parquet or JSON
or avro formats. This is a very restrictive format. With vectors the user
has the choice of taking in whatever data format and converting them to
vectors insteda of reading json files creating a csv file and then workig
on that.
4. Similar use of counters in 54 and 65 is an issue.
Basically the shared state counters is a huge issue that does not scale.
Since the processing of RDD's is distributed and the value j lives on the
master.

Anant



On Tue, Nov 4, 2014 at 7:22 AM, Ashutosh [via Apache Spark Developers List]
[hidden email]http://user/SendEmail.jtp?type=nodenode=9239i=1 wrote:


  Anant,

I got rid of those increment/ decrements functions and now code is much
cleaner. Please check. All your comments have been looked after.




https://github.com/codeAshu/Outlier-Detection-with-AVF-Spark/blob/master/OutlierWithAVFModel.scala


  _Ashu



https://github.com/codeAshu/Outlier-Detection-with-AVF-Spark/blob/master/OutlierWithAVFModel.scala

   Outlier-Detection-with-AVF-Spark/OutlierWithAVFModel.scala at master ·
codeAshu/Outlier-Detection-with-AVF-Spark · GitHub
  Contribute to Outlier-Detection-with-AVF-Spark development by creating

an

account on GitHub.
  Read more...


https://github.com/codeAshu/Outlier-Detection-with-AVF-Spark/blob/master/OutlierWithAVFModel.scala


  --
*From:* slcclimber [via Apache Spark Developers List] ml-node+[hidden
email] http://user/SendEmail.jtp?type=nodenode=9083i=0
*Sent:* Friday, October 31, 2014 10:09 AM
*To:* Ashutosh Trivedi (MT2013030)
*Subject:* Re: [MLlib] Contributing Algorithm for Outlier Detection


You should create a jira ticket to go with it as well.
Thanks
On Oct 30, 2014 10:38 PM, Ashutosh [via Apache Spark 

Re: About implicit rddToPairRDDFunctions

2014-11-13 Thread Reynold Xin
That seems like a great idea. Can you submit a pull request?


On Thu, Nov 13, 2014 at 7:13 PM, Shixiong Zhu zsxw...@gmail.com wrote:

 If we put the `implicit` into pacakge object rdd or object rdd, when
 we write `rdd.groupbykey()`, because rdd is an object of RDD, Scala
 compiler will search `object rdd`(companion object) and `package object 
 rdd`(pacakge
 object) by default. We don't need to import them explicitly. Here is a
 post about the implicit search logic:
 http://eed3si9n.com/revisiting-implicits-without-import-tax

 To maintain the compatibility, we can keep `rddToPairRDDFunctions` in the
 SparkContext but remove `implicit`. The disadvantage is there are two
 copies of same codes.




 Best Regards,
 Shixiong Zhu

 2014-11-14 3:57 GMT+08:00 Reynold Xin r...@databricks.com:

 Do people usually important o.a.spark.rdd._ ?

 Also in order to maintain source and binary compatibility, we would need
 to keep both right?


 On Thu, Nov 6, 2014 at 3:12 AM, Shixiong Zhu zsxw...@gmail.com wrote:

 I saw many people asked how to convert a RDD to a PairRDDFunctions. I
 would
 like to ask a question about it. Why not put the following implicit into
 pacakge object rdd or object rdd?

   implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
   (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] =
 null)
 = {
 new PairRDDFunctions(rdd)
   }

 If so, the converting will be automatic and not need to
 import org.apache.spark.SparkContext._

 I tried to search some discussion but found nothing.

 Best Regards,
 Shixiong Zhu






Re: About implicit rddToPairRDDFunctions

2014-11-13 Thread Shixiong Zhu
OK. I'll take it.

Best Regards,
Shixiong Zhu

2014-11-14 12:34 GMT+08:00 Reynold Xin r...@databricks.com:

 That seems like a great idea. Can you submit a pull request?


 On Thu, Nov 13, 2014 at 7:13 PM, Shixiong Zhu zsxw...@gmail.com wrote:

 If we put the `implicit` into pacakge object rdd or object rdd, when
 we write `rdd.groupbykey()`, because rdd is an object of RDD, Scala
 compiler will search `object rdd`(companion object) and `package object 
 rdd`(pacakge
 object) by default. We don't need to import them explicitly. Here is a
 post about the implicit search logic:
 http://eed3si9n.com/revisiting-implicits-without-import-tax

 To maintain the compatibility, we can keep `rddToPairRDDFunctions` in
 the SparkContext but remove `implicit`. The disadvantage is there are
 two copies of same codes.




 Best Regards,
 Shixiong Zhu

 2014-11-14 3:57 GMT+08:00 Reynold Xin r...@databricks.com:

 Do people usually important o.a.spark.rdd._ ?

 Also in order to maintain source and binary compatibility, we would need
 to keep both right?


 On Thu, Nov 6, 2014 at 3:12 AM, Shixiong Zhu zsxw...@gmail.com wrote:

 I saw many people asked how to convert a RDD to a PairRDDFunctions. I
 would
 like to ask a question about it. Why not put the following implicit into
 pacakge object rdd or object rdd?

   implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
   (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] =
 null)
 = {
 new PairRDDFunctions(rdd)
   }

 If so, the converting will be automatic and not need to
 import org.apache.spark.SparkContext._

 I tried to search some discussion but found nothing.

 Best Regards,
 Shixiong Zhu







Re: [MLlib] Contributing Algorithm for Outlier Detection

2014-11-13 Thread Meethu Mathew


Hi,

I have a doubt regarding the input to your algorithm.
_http://www.linkedin.com/home?trk=hb_tab_home_top_

val model = OutlierWithAVFModel.outliers(data :RDD[Vector[String]], 
percent : Double, sc :SparkContext)



Here our input  data is an RDD[Vector[String]]. How we can create this 
RDD from a file? sc.textFile will simply give us an RDD, how to make it 
a Vector[String]?



Could you plz share any code snippet of this conversion if you have..


Regards,
Meethu Mathew

On Friday 14 November 2014 10:02 AM, Meethu Mathew wrote:

Hi Ashutosh,

Please edit the README file.I think the following function call is
changed now.

|model = OutlierWithAVFModel.outliers(master:String, input dir:String , 
percentage:Double||)
|

Regards,

*Meethu Mathew*

*Engineer*

*Flytxt*

_http://www.linkedin.com/home?trk=hb_tab_home_top_

On Friday 14 November 2014 12:01 AM, Ashutosh wrote:

Hi Anant,

Please see the changes.

https://github.com/codeAshu/Outlier-Detection-with-AVF-Spark/blob/master/OutlierWithAVFModel.scala


I have changed the input format to Vector of String. I think we can also make 
it generic.


Line 59  72 : that counter will not affect in parallelism, Since it only work 
on one datapoint. It  only does the Indexing of the column.


Rest all side effects have been removed.

​

Thanks,

Ashutosh





From: slcclimber [via Apache Spark Developers List] 
ml-node+s1001551n9287...@n3.nabble.com
Sent: Tuesday, November 11, 2014 11:46 PM
To: Ashutosh Trivedi (MT2013030)
Subject: Re: [MLlib] Contributing Algorithm for Outlier Detection


Mayur,
Libsvm format sounds good to me. I could work on writing the tests if that 
helps you?
Anant

On Nov 11, 2014 11:06 AM, Ashutosh [via Apache Spark Developers List] [hidden 
email]/user/SendEmail.jtp?type=nodenode=9287i=0 wrote:

Hi Mayur,

Vector data types are implemented using breeze library, it is presented at

.../org/apache/spark/mllib/linalg


Anant,

One restriction I found that a vector can only be of 'Double', so it actually 
restrict the user.

What are you thoughts on LibSVM format?

Thanks for the comments, I was just trying to get away from those increment 
/decrement functions, they look ugly. Points are noted. I'll try to fix them 
soon. Tests are also required for the code.


Regards,

Ashutosh



From: Mayur Rustagi [via Apache Spark Developers List] ml-node+[hidden 
email]http://user/SendEmail.jtp?type=nodenode=9286i=0
Sent: Saturday, November 8, 2014 12:52 PM
To: Ashutosh Trivedi (MT2013030)
Subject: Re: [MLlib] Contributing Algorithm for Outlier Detection


We should take a vector instead giving the user flexibility to decide
data source/ type

What do you mean by vector datatype exactly?

Mayur Rustagi
Ph: a href=tel:%2B1%20%28760%29%20203%203257 value=+17602033257 
target=_blank+1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi


On Wed, Nov 5, 2014 at 6:45 AM, slcclimber [hidden 
email]http://user/SendEmail.jtp?type=nodenode=9239i=0 wrote:


Ashutosh,
I still see a few issues.
1. On line 112 you are counting using a counter. Since this will happen in
a RDD the counter will cause issues. Also that is not good functional style
to use a filter function with a side effect.
You could use randomSplit instead. This does not the same thing without the
side effect.
2. Similar shared usage of j in line 102 is going to be an issue as well.
also hash seed does not need to be sequential it could be randomly
generated or hashed on the values.
3. The compute function and trim scores still runs on a comma separeated
RDD. We should take a vector instead giving the user flexibility to decide
data source/ type. what if we want data from hive tables or parquet or JSON
or avro formats. This is a very restrictive format. With vectors the user
has the choice of taking in whatever data format and converting them to
vectors insteda of reading json files creating a csv file and then workig
on that.
4. Similar use of counters in 54 and 65 is an issue.
Basically the shared state counters is a huge issue that does not scale.
Since the processing of RDD's is distributed and the value j lives on the
master.

Anant



On Tue, Nov 4, 2014 at 7:22 AM, Ashutosh [via Apache Spark Developers List]
[hidden email]http://user/SendEmail.jtp?type=nodenode=9239i=1 wrote:


   Anant,

I got rid of those increment/ decrements functions and now code is much
cleaner. Please check. All your comments have been looked after.




https://github.com/codeAshu/Outlier-Detection-with-AVF-Spark/blob/master/OutlierWithAVFModel.scala

   _Ashu



https://github.com/codeAshu/Outlier-Detection-with-AVF-Spark/blob/master/OutlierWithAVFModel.scala

Outlier-Detection-with-AVF-Spark/OutlierWithAVFModel.scala at master ·
codeAshu/Outlier-Detection-with-AVF-Spark · GitHub
   Contribute to Outlier-Detection-with-AVF-Spark development by creating

an

account on GitHub.
   

Re: [MLlib] Contributing Algorithm for Outlier Detection

2014-11-13 Thread Ashutosh
Please use the following snippet. I am still working on to make it a generic 
vector, so that input
should not Vector[String] always. But String will work fine for now.


def main(args:Array[String])
 {
  val sc = new SparkContext(local, OutlierDetection)
  val dir = hdfs://localhost:54310/train3  your file path

   val data = sc.textFile(dir).map(word = word.split(,).toVector)
   val model = OutlierWithAVFModel.outliers(data,20,sc)

   model.score.saveAsTextFile(../scores)
   model.trimmed_data.saveAsTextFile(.../trimmed)
 }



From: Meethu Mathew-2 [via Apache Spark Developers List] 
ml-node+s1001551n9352...@n3.nabble.com
Sent: Friday, November 14, 2014 11:42 AM
To: Ashutosh Trivedi (MT2013030)
Subject: Re: [MLlib] Contributing Algorithm for Outlier Detection


Hi,

I have a doubt regarding the input to your algorithm.
_http://www.linkedin.com/home?trk=hb_tab_home_top_

val model = OutlierWithAVFModel.outliers(data :RDD[Vector[String]],
percent : Double, sc :SparkContext)


Here our input  data is an RDD[Vector[String]]. How we can create this
RDD from a file? sc.textFile will simply give us an RDD, how to make it
a Vector[String]?


Could you plz share any code snippet of this conversion if you have..


Regards,
Meethu Mathew

On Friday 14 November 2014 10:02 AM, Meethu Mathew wrote:

 Hi Ashutosh,

 Please edit the README file.I think the following function call is
 changed now.

 |model = OutlierWithAVFModel.outliers(master:String, input dir:String , 
 percentage:Double||)
 |

 Regards,

 *Meethu Mathew*

 *Engineer*

 *Flytxt*

 _http://www.linkedin.com/home?trk=hb_tab_home_top_

 On Friday 14 November 2014 12:01 AM, Ashutosh wrote:
 Hi Anant,

 Please see the changes.

 https://github.com/codeAshu/Outlier-Detection-with-AVF-Spark/blob/master/OutlierWithAVFModel.scala


 I have changed the input format to Vector of String. I think we can also 
 make it generic.


 Line 59  72 : that counter will not affect in parallelism, Since it only 
 work on one datapoint. It  only does the Indexing of 
 the column.


 Rest all side effects have been removed.

 ​

 Thanks,

 Ashutosh




 
 From: slcclimber [via Apache Spark Developers List] [hidden 
 email]/user/SendEmail.jtp?type=nodenode=9352i=0
 Sent: Tuesday, November 11, 2014 11:46 PM
 To: Ashutosh Trivedi (MT2013030)
 Subject: Re: [MLlib] Contributing Algorithm for Outlier Detection


 Mayur,
 Libsvm format sounds good to me. I could work on writing the tests if that 
 helps you?
 Anant

 On Nov 11, 2014 11:06 AM, Ashutosh [via Apache Spark Developers List] 
 [hidden email]/user/SendEmail.jtp?type=nodenode=9287i=0 wrote:

 Hi Mayur,

 Vector data types are implemented using breeze library, it is presented at

 .../org/apache/spark/mllib/linalg


 Anant,

 One restriction I found that a vector can only be of 'Double', so it 
 actually restrict the user.

 What are you thoughts on LibSVM format?

 Thanks for the comments, I was just trying to get away from those increment 
 /decrement functions, they look ugly. Points are noted. I'll try to fix them 
 soon. Tests are also required for the code.


 Regards,

 Ashutosh


 
 From: Mayur Rustagi [via Apache Spark Developers List] ml-node+[hidden 
 email]http://user/SendEmail.jtp?type=nodenode=9286i=0
 Sent: Saturday, November 8, 2014 12:52 PM
 To: Ashutosh Trivedi (MT2013030)
 Subject: Re: [MLlib] Contributing Algorithm for Outlier Detection

 We should take a vector instead giving the user flexibility to decide
 data source/ type
 What do you mean by vector datatype exactly?

 Mayur Rustagi
 Ph: a href=tel:%2B1%20%28760%29%20203%203257 value=+17602033257 
 target=_blank+1 (760) 203 3257
 http://www.sigmoidanalytics.com
 @mayur_rustagi https://twitter.com/mayur_rustagi


 On Wed, Nov 5, 2014 at 6:45 AM, slcclimber [hidden 
 email]http://user/SendEmail.jtp?type=nodenode=9239i=0 wrote:

 Ashutosh,
 I still see a few issues.
 1. On line 112 you are counting using a counter. Since this will happen in
 a RDD the counter will cause issues. Also that is not good functional style
 to use a filter function with a side effect.
 You could use randomSplit instead. This does not the same thing without the
 side effect.
 2. Similar shared usage of j in line 102 is going to be an issue as well.
 also hash seed does not need to be sequential it could be randomly
 generated or hashed on the values.
 3. The compute function and trim scores still runs on a comma separeated
 RDD. We should take a vector instead giving the user flexibility to decide
 data source/ type. what if we want data from hive tables or parquet or JSON
 or avro formats. This is a very restrictive format. With vectors the user
 has the choice of taking in whatever data format and converting them to
 vectors insteda of reading json files creating a csv file and then workig
 on that.
 4. Similar use of counters in 54 and 65 is an issue.