Problems with spark.locality.wait
Hi, We are running a time sensitive application with 70 partition and 800MB each parition size. The application first load data from database in different cluster, then apply a filter, cache the filted data, then apply a map and a reduce, finally collect results. The application will be finished in 20 seconds if we set spark.locality.wait to a large value (30 minutes). And it will use 100 seconds, if we set spark.locality.wait a small value(less than 10 seconds) We have analysed the driver log and found lot of NODE_LOCAL and RACK_LOCAL level tasks, normally a PROCESS_LOCAL task only takes 15 seconds, but NODE_LOCAL or RACK_LOCAL tasks will take 70 seconds. So I think we'd better set spark.locality.wait to a large value(30 minutes), until we meet this problem: Now our application will load data from hdfs in the same spark cluster, it will get NODE_LOCAL and RACK_LOCAL level tasks during loading stage, if the tasks in loading stage have same locality level, ether NODE_LOCAL or RACK_LOCAL it works fine. But if the tasks in loading stage get mixed locality level, such as 3 NODE_LOCAL tasks, and 2 RACK_LOCAL tasks, then the TaskSetManager of loading stage will submit the 3 NODE_LOCAL tasks as soon as resources were offered, then wait for spark.locality.wait.node, which was setted to 30 minutes, the 2 RACK_LOCAL tasks will wait 30 minutes even though resources are avaliable. Does any one have met this problem? Do you have a nice solution? Thanks Ma chong
Re: [VOTE] Release Apache Spark 1.1.1 (RC1)
LICENSE and NOTICE are fine. Signature and checksum is fine. I unzipped and built the plain source distribution, which built. However I am seeing a consistent test failure with mvn -DskipTests clean package; mvn test. In the Hive module: - SET commands semantics for a HiveContext *** FAILED *** Expected Array(spark.sql.key.usedfortestonly=test.val.0, spark.sql.key.usedfortestonlyspark.sql.key.usedfortestonly=test.val.0test.val.0), but got Array(spark.sql.key.usedfortestonlyspark.sql.key.usedfortestonly=test.val.0test.val.0, spark.sql.key.usedfortestonly=test.val.0) (HiveQuerySuite.scala:544) Anyone else seeing this? On Thu, Nov 13, 2014 at 8:18 AM, Krishna Sankar ksanka...@gmail.com wrote: +1 1. Compiled OSX 10.10 (Yosemite) mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests clean package 10:49 min 2. Tested pyspark, mlib 2.1. statistics OK 2.2. Linear/Ridge/Laso Regression OK 2.3. Decision Tree, Naive Bayes OK 2.4. KMeans OK 2.5. rdd operations OK 2.6. recommendation OK 2.7. Good work ! In 1.1.0, there was an error and my program used to hang (over memory allocation) consistently running validation using itertools, compute optimum rank, lambda,numofiterations/rmse; data - movielens medium dataset (1 million records) . It works well in 1.1.1 ! Cheers k/ P.S: Missed Reply all, first time On Wed, Nov 12, 2014 at 8:35 PM, Andrew Or and...@databricks.com wrote: I will start the vote with a +1 2014-11-12 20:34 GMT-08:00 Andrew Or and...@databricks.com: Please vote on releasing the following candidate as Apache Spark version 1 .1.1. This release fixes a number of bugs in Spark 1.1.0. Some of the notable ones are - [SPARK-3426] Sort-based shuffle compression settings are incompatible - [SPARK-3948] Stream corruption issues in sort-based shuffle - [SPARK-4107] Incorrect handling of Channel.read() led to data truncation The full list is at http://s.apache.org/z9h and in the CHANGES.txt attached. The tag to be voted on is v1.1.1-rc1 (commit 72a4fdbe): http://s.apache.org/cZC The release files, including signatures, digests, etc can be found at: http://people.apache.org/~andrewor14/spark-1.1.1-rc1/ Release artifacts are signed with the following key: https://people.apache.org/keys/committer/andrewor14.asc The staging repository for this release can be found at: https://repository.apache.org/content/repositories/orgapachespark-1034/ The documentation corresponding to this release can be found at: http://people.apache.org/~andrewor14/spark-1.1.1-rc1-docs/ Please vote on releasing this package as Apache Spark 1.1.1! The vote is open until Sunday, November 16, at 04:30 UTC and passes if a majority of at least 3 +1 PMC votes are cast. [ ] +1 Release this package as Apache Spark 1.1.1 [ ] -1 Do not release this package because ... To learn more about Apache Spark, please see http://spark.apache.org/ Cheers, Andrew - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Join operator in PySpark
Hi all I have noticed that “Join” operator has been transferred to union and groupByKey operator instead of cogroup operator in PySpark, this change will probably generate more shuffle stage, for example rdd1 = sc.makeRDD(...).partitionBy(2) rdd2 = sc.makeRDD(...).partitionBy(2) rdd3 = rdd1.join(rdd2).collect() Above code implemented with scala will generate 2 shuffle, but will generate 3 shuffle with python. what is initial design motivation of join operator in PySpark? Any idea to improve join performance in PySpark? Andrew
Re: Cache sparkSql data without uncompressing it in memory
Thanks Chneg, Just one more question - does that mean that we still need enough memory in the cluster to uncompress the data before it can be compressed again or does that just read the raw data as is? On Wed, Nov 12, 2014 at 10:05 PM, Cheng Lian lian.cs@gmail.com wrote: Currently there’s no way to cache the compressed sequence file directly. Spark SQL uses in-memory columnar format while caching table rows, so we must read all the raw data and convert them into columnar format. However, you can enable in-memory columnar compression by setting spark.sql.inMemoryColumnarStorage.compressed to true. This property is already set to true by default in master branch and branch-1.2. On 11/13/14 7:16 AM, Sadhan Sood wrote: We noticed while caching data from our hive tables which contain data in compressed sequence file format that it gets uncompressed in memory when getting cached. Is there a way to turn this off and cache the compressed data as is ?
Re: [NOTICE] [BUILD] Minor changes to Spark's build
Hello there, So I just took a quick look at the pom and I see two problems with it. - activatedByDefault does not work like you think it does. It only activates by default if you do not explicitly activate other profiles. So if you do mvn package, scala-2.10 will be activated; but if you do mvn -Pyarn package, it will not. - you need to duplicate the activation stuff everywhere where the profile is declared, not just in the root pom. (I spent quite some time yesterday fighting a similar issue...) My suggestion here is to change the activation of scala-2.10 to look like this: activation property name!scala-2.11/name /property /activation And change the scala-2.11 profile to do this: properties scala-2.11true/scala-2.11 /properties I haven't tested, but in my experience this will activate the scala-2.10 profile by default, unless you explicitly activate the 2.11 profile, in which case that property will be set and scala-2.10 will not activate. If you look at examples/pom.xml, that's the same strategy used to choose which hbase profile to activate. Ah, and just to reinforce, the activation logic needs to be copied to other places (e.g. examples/pom.xml, repl/pom.xml, and any other place that has scala-2.x profiles). On Wed, Nov 12, 2014 at 11:14 PM, Patrick Wendell pwend...@gmail.com wrote: I actually do agree with this - let's see if we can find a solution that doesn't regress this behavior. Maybe we can simply move the one kafka example into its own project instead of having it in the examples project. On Wed, Nov 12, 2014 at 11:07 PM, Sandy Ryza sandy.r...@cloudera.com wrote: Currently there are no mandatory profiles required to build Spark. I.e. mvn package just works. It seems sad that we would need to break this. On Wed, Nov 12, 2014 at 10:59 PM, Patrick Wendell pwend...@gmail.com wrote: I think printing an error that says -Pscala-2.10 must be enabled is probably okay. It's a slight regression but it's super obvious to users. That could be a more elegant solution than the somewhat complicated monstrosity I proposed on the JIRA. On Wed, Nov 12, 2014 at 10:37 PM, Prashant Sharma scrapco...@gmail.com wrote: One thing we can do it is print a helpful error and break. I don't know about how this can be done, but since now I can write groovy inside maven build so we have more control. (Yay!!) Prashant Sharma On Thu, Nov 13, 2014 at 12:05 PM, Patrick Wendell pwend...@gmail.com wrote: Yeah Sandy and I were chatting about this today and din't realize -Pscala-2.10 was mandatory. This is a fairly invasive change, so I was thinking maybe we could try to remove that. Also if someone doesn't give -Pscala-2.10 it fails in a way that is initially silent, which is bad because most people won't know to do this. https://issues.apache.org/jira/browse/SPARK-4375 On Wed, Nov 12, 2014 at 10:29 PM, Prashant Sharma scrapco...@gmail.com wrote: Thanks Patrick, I have one suggestion that we should make passing -Pscala-2.10 mandatory for maven users. I am sorry for not mentioning this before. There is no way around not passing that option for maven users(only). However, this is unnecessary for sbt users because it is added automatically if -Pscala-2.11 is absent. Prashant Sharma On Wed, Nov 12, 2014 at 3:53 PM, Sean Owen so...@cloudera.com wrote: - Tip: when you rebase, IntelliJ will temporarily think things like the Kafka module are being removed. Say 'no' when it asks if you want to remove them. - Can we go straight to Scala 2.11.4? On Wed, Nov 12, 2014 at 5:47 AM, Patrick Wendell pwend...@gmail.com wrote: Hey All, I've just merged a patch that adds support for Scala 2.11 which will have some minor implications for the build. These are due to the complexities of supporting two versions of Scala in a single project. 1. The JDBC server will now require a special flag to build -Phive-thriftserver on top of the existing flag -Phive. This is because some build permutations (only in Scala 2.11) won't support the JDBC server yet due to transitive dependency conflicts. 2. The build now uses non-standard source layouts in a few additional places (we already did this for the Hive project) - the repl and the examples modules. This is just fine for maven/sbt, but it may affect users who import the build in IDE's that are using these projects and want to build Spark from the IDE. I'm going to update our wiki to include full instructions for making this work well in IntelliJ. If there are any other build related issues please respond to this thread and we'll make sure they get sorted out. Thanks to Prashant Sharma who is the author of this feature! - Patrick - To unsubscribe,
Re: [MLlib] Contributing Algorithm for Outlier Detection
Hi Anant, Please see the changes. https://github.com/codeAshu/Outlier-Detection-with-AVF-Spark/blob/master/OutlierWithAVFModel.scala I have changed the input format to Vector of String. I think we can also make it generic. Line 59 72 : that counter will not affect in parallelism, Since it only work on one datapoint. It only does the Indexing of the column. Rest all side effects have been removed. Thanks, Ashutosh From: slcclimber [via Apache Spark Developers List] ml-node+s1001551n9287...@n3.nabble.com Sent: Tuesday, November 11, 2014 11:46 PM To: Ashutosh Trivedi (MT2013030) Subject: Re: [MLlib] Contributing Algorithm for Outlier Detection Mayur, Libsvm format sounds good to me. I could work on writing the tests if that helps you? Anant On Nov 11, 2014 11:06 AM, Ashutosh [via Apache Spark Developers List] [hidden email]/user/SendEmail.jtp?type=nodenode=9287i=0 wrote: Hi Mayur, Vector data types are implemented using breeze library, it is presented at .../org/apache/spark/mllib/linalg Anant, One restriction I found that a vector can only be of 'Double', so it actually restrict the user. What are you thoughts on LibSVM format? Thanks for the comments, I was just trying to get away from those increment /decrement functions, they look ugly. Points are noted. I'll try to fix them soon. Tests are also required for the code. Regards, Ashutosh From: Mayur Rustagi [via Apache Spark Developers List] ml-node+[hidden email]http://user/SendEmail.jtp?type=nodenode=9286i=0 Sent: Saturday, November 8, 2014 12:52 PM To: Ashutosh Trivedi (MT2013030) Subject: Re: [MLlib] Contributing Algorithm for Outlier Detection We should take a vector instead giving the user flexibility to decide data source/ type What do you mean by vector datatype exactly? Mayur Rustagi Ph: a href=tel:%2B1%20%28760%29%20203%203257 value=+17602033257 target=_blank+1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Wed, Nov 5, 2014 at 6:45 AM, slcclimber [hidden email]http://user/SendEmail.jtp?type=nodenode=9239i=0 wrote: Ashutosh, I still see a few issues. 1. On line 112 you are counting using a counter. Since this will happen in a RDD the counter will cause issues. Also that is not good functional style to use a filter function with a side effect. You could use randomSplit instead. This does not the same thing without the side effect. 2. Similar shared usage of j in line 102 is going to be an issue as well. also hash seed does not need to be sequential it could be randomly generated or hashed on the values. 3. The compute function and trim scores still runs on a comma separeated RDD. We should take a vector instead giving the user flexibility to decide data source/ type. what if we want data from hive tables or parquet or JSON or avro formats. This is a very restrictive format. With vectors the user has the choice of taking in whatever data format and converting them to vectors insteda of reading json files creating a csv file and then workig on that. 4. Similar use of counters in 54 and 65 is an issue. Basically the shared state counters is a huge issue that does not scale. Since the processing of RDD's is distributed and the value j lives on the master. Anant On Tue, Nov 4, 2014 at 7:22 AM, Ashutosh [via Apache Spark Developers List] [hidden email]http://user/SendEmail.jtp?type=nodenode=9239i=1 wrote: Anant, I got rid of those increment/ decrements functions and now code is much cleaner. Please check. All your comments have been looked after. https://github.com/codeAshu/Outlier-Detection-with-AVF-Spark/blob/master/OutlierWithAVFModel.scala _Ashu https://github.com/codeAshu/Outlier-Detection-with-AVF-Spark/blob/master/OutlierWithAVFModel.scala Outlier-Detection-with-AVF-Spark/OutlierWithAVFModel.scala at master · codeAshu/Outlier-Detection-with-AVF-Spark · GitHub Contribute to Outlier-Detection-with-AVF-Spark development by creating an account on GitHub. Read more... https://github.com/codeAshu/Outlier-Detection-with-AVF-Spark/blob/master/OutlierWithAVFModel.scala -- *From:* slcclimber [via Apache Spark Developers List] ml-node+[hidden email] http://user/SendEmail.jtp?type=nodenode=9083i=0 *Sent:* Friday, October 31, 2014 10:09 AM *To:* Ashutosh Trivedi (MT2013030) *Subject:* Re: [MLlib] Contributing Algorithm for Outlier Detection You should create a jira ticket to go with it as well. Thanks On Oct 30, 2014 10:38 PM, Ashutosh [via Apache Spark Developers List] [hidden email] http://user/SendEmail.jtp?type=nodenode=9037i=0 wrote: Okay. I'll try it and post it soon with test case. After that I think we can go ahead with the PR. -- *From:* slcclimber [via Apache Spark Developers List]
Re: [NOTICE] [BUILD] Minor changes to Spark's build
Hey Patrick, On Thu, Nov 13, 2014 at 10:49 AM, Patrick Wendell pwend...@gmail.com wrote: I'm not sure chaining activation works like that. At least in my experience activation based on properties only works for properties explicitly specified at the command line rather than declared elsewhere in the pom. That's true, but note the code I posted activates a profile based on the lack of a property being set, which is why it works. Granted, I did not test that if you activate the other profile, the one with the property check will be disabled. I any case, I think Prashant just didn't document that his patch required -Pscala-2.10 explicitly, which is what he said further up in the thread. And Sandy has a solution that has better behavior than that, which is nice. Yeah, I saw Sandy's patch now and it's probably a better solution (since it doesn't abuse the sort of tricky maven profile stuff as much). -- Marcelo - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: [NOTICE] [BUILD] Minor changes to Spark's build
On Thu, Nov 13, 2014 at 10:58 AM, Patrick Wendell pwend...@gmail.com wrote: That's true, but note the code I posted activates a profile based on the lack of a property being set, which is why it works. Granted, I did not test that if you activate the other profile, the one with the property check will be disabled. Ah yeah good call - I so then we'd trigger 2.11-vs-not based on the presence of -Dscala-2.11. Would that fix this issue then? It might be a simpler fix to merge into the 1.2 branch than Sandy's patch since we're pretty late in the game (though that patch does other things separately that I'd like to see end up in Spark soon). Yeah, that's the idea. As for simplicity, I think Sandy's patch would be just as simple if it avoided all the changes to isolate the examples / external stuff into different profiles. -- Marcelo - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: Problems with spark.locality.wait
Hi, Shivaram and I stumbled across this problem a few weeks ago, and AFAIK there is no nice solution. We worked around it by avoiding jobs with tasks that have tasks with two locality levels. To fix this problem, we really need to fix the underlying problem in the scheduling code, which currently tries to schedule all tasks at the minimum locality of any of the tasks in the job. Unfortunately, this involves adding a bunch of complexity to the scheduling code. Patrick had previously convinced us that we were the only ones running into this problem, so it wasn't worth fixing (especially because we found an OK workaround for our case). It would be useful to hear if any other folks have run into this problem -- it sounds like maybe we should go ahead and fix the scheduling code. I've filed a JIRA to track this: https://issues.apache.org/jira/browse/SPARK-4383 -Kay On Wed, Nov 12, 2014 at 11:55 PM, MaChong machon...@sina.com wrote: Hi, We are running a time sensitive application with 70 partition and 800MB each parition size. The application first load data from database in different cluster, then apply a filter, cache the filted data, then apply a map and a reduce, finally collect results. The application will be finished in 20 seconds if we set spark.locality.wait to a large value (30 minutes). And it will use 100 seconds, if we set spark.locality.wait a small value(less than 10 seconds) We have analysed the driver log and found lot of NODE_LOCAL and RACK_LOCAL level tasks, normally a PROCESS_LOCAL task only takes 15 seconds, but NODE_LOCAL or RACK_LOCAL tasks will take 70 seconds. So I think we'd better set spark.locality.wait to a large value(30 minutes), until we meet this problem: Now our application will load data from hdfs in the same spark cluster, it will get NODE_LOCAL and RACK_LOCAL level tasks during loading stage, if the tasks in loading stage have same locality level, ether NODE_LOCAL or RACK_LOCAL it works fine. But if the tasks in loading stage get mixed locality level, such as 3 NODE_LOCAL tasks, and 2 RACK_LOCAL tasks, then the TaskSetManager of loading stage will submit the 3 NODE_LOCAL tasks as soon as resources were offered, then wait for spark.locality.wait.node, which was setted to 30 minutes, the 2 RACK_LOCAL tasks will wait 30 minutes even though resources are avaliable. Does any one have met this problem? Do you have a nice solution? Thanks Ma chong
Re: [VOTE] Release Apache Spark 1.1.1 (RC1)
Hey Sean, Thanks for pointing this out. Looks like a bad test where we should be doing Set comparison instead of Array. Michael On Thu, Nov 13, 2014 at 2:05 AM, Sean Owen so...@cloudera.com wrote: LICENSE and NOTICE are fine. Signature and checksum is fine. I unzipped and built the plain source distribution, which built. However I am seeing a consistent test failure with mvn -DskipTests clean package; mvn test. In the Hive module: - SET commands semantics for a HiveContext *** FAILED *** Expected Array(spark.sql.key.usedfortestonly=test.val.0, spark.sql.key.usedfortestonlyspark.sql.key.usedfortestonly=test.val.0test.val.0), but got Array(spark.sql.key.usedfortestonlyspark.sql.key.usedfortestonly=test.val.0test.val.0, spark.sql.key.usedfortestonly=test.val.0) (HiveQuerySuite.scala:544) Anyone else seeing this? On Thu, Nov 13, 2014 at 8:18 AM, Krishna Sankar ksanka...@gmail.com wrote: +1 1. Compiled OSX 10.10 (Yosemite) mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests clean package 10:49 min 2. Tested pyspark, mlib 2.1. statistics OK 2.2. Linear/Ridge/Laso Regression OK 2.3. Decision Tree, Naive Bayes OK 2.4. KMeans OK 2.5. rdd operations OK 2.6. recommendation OK 2.7. Good work ! In 1.1.0, there was an error and my program used to hang (over memory allocation) consistently running validation using itertools, compute optimum rank, lambda,numofiterations/rmse; data - movielens medium dataset (1 million records) . It works well in 1.1.1 ! Cheers k/ P.S: Missed Reply all, first time On Wed, Nov 12, 2014 at 8:35 PM, Andrew Or and...@databricks.com wrote: I will start the vote with a +1 2014-11-12 20:34 GMT-08:00 Andrew Or and...@databricks.com: Please vote on releasing the following candidate as Apache Spark version 1 .1.1. This release fixes a number of bugs in Spark 1.1.0. Some of the notable ones are - [SPARK-3426] Sort-based shuffle compression settings are incompatible - [SPARK-3948] Stream corruption issues in sort-based shuffle - [SPARK-4107] Incorrect handling of Channel.read() led to data truncation The full list is at http://s.apache.org/z9h and in the CHANGES.txt attached. The tag to be voted on is v1.1.1-rc1 (commit 72a4fdbe): http://s.apache.org/cZC The release files, including signatures, digests, etc can be found at: http://people.apache.org/~andrewor14/spark-1.1.1-rc1/ Release artifacts are signed with the following key: https://people.apache.org/keys/committer/andrewor14.asc The staging repository for this release can be found at: https://repository.apache.org/content/repositories/orgapachespark-1034/ The documentation corresponding to this release can be found at: http://people.apache.org/~andrewor14/spark-1.1.1-rc1-docs/ Please vote on releasing this package as Apache Spark 1.1.1! The vote is open until Sunday, November 16, at 04:30 UTC and passes if a majority of at least 3 +1 PMC votes are cast. [ ] +1 Release this package as Apache Spark 1.1.1 [ ] -1 Do not release this package because ... To learn more about Apache Spark, please see http://spark.apache.org/ Cheers, Andrew - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: About implicit rddToPairRDDFunctions
Do people usually important o.a.spark.rdd._ ? Also in order to maintain source and binary compatibility, we would need to keep both right? On Thu, Nov 6, 2014 at 3:12 AM, Shixiong Zhu zsxw...@gmail.com wrote: I saw many people asked how to convert a RDD to a PairRDDFunctions. I would like to ask a question about it. Why not put the following implicit into pacakge object rdd or object rdd? implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)]) (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null) = { new PairRDDFunctions(rdd) } If so, the converting will be automatic and not need to import org.apache.spark.SparkContext._ I tried to search some discussion but found nothing. Best Regards, Shixiong Zhu
Re: [VOTE] Release Apache Spark 1.1.1 (RC1)
Yeah, this seems to be somewhat environment specific too. The same test has been passing here for a while: https://amplab.cs.berkeley.edu/jenkins/job/Spark-1.1-Maven-pre-YARN/hadoop.version=1.0.4,label=centos/lastBuild/consoleFull 2014-11-13 11:26 GMT-08:00 Michael Armbrust mich...@databricks.com: Hey Sean, Thanks for pointing this out. Looks like a bad test where we should be doing Set comparison instead of Array. Michael On Thu, Nov 13, 2014 at 2:05 AM, Sean Owen so...@cloudera.com wrote: LICENSE and NOTICE are fine. Signature and checksum is fine. I unzipped and built the plain source distribution, which built. However I am seeing a consistent test failure with mvn -DskipTests clean package; mvn test. In the Hive module: - SET commands semantics for a HiveContext *** FAILED *** Expected Array(spark.sql.key.usedfortestonly=test.val.0, spark.sql.key.usedfortestonlyspark.sql.key.usedfortestonly=test.val.0test.val.0), but got Array(spark.sql.key.usedfortestonlyspark.sql.key.usedfortestonly=test.val.0test.val.0, spark.sql.key.usedfortestonly=test.val.0) (HiveQuerySuite.scala:544) Anyone else seeing this? On Thu, Nov 13, 2014 at 8:18 AM, Krishna Sankar ksanka...@gmail.com wrote: +1 1. Compiled OSX 10.10 (Yosemite) mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests clean package 10:49 min 2. Tested pyspark, mlib 2.1. statistics OK 2.2. Linear/Ridge/Laso Regression OK 2.3. Decision Tree, Naive Bayes OK 2.4. KMeans OK 2.5. rdd operations OK 2.6. recommendation OK 2.7. Good work ! In 1.1.0, there was an error and my program used to hang (over memory allocation) consistently running validation using itertools, compute optimum rank, lambda,numofiterations/rmse; data - movielens medium dataset (1 million records) . It works well in 1.1.1 ! Cheers k/ P.S: Missed Reply all, first time On Wed, Nov 12, 2014 at 8:35 PM, Andrew Or and...@databricks.com wrote: I will start the vote with a +1 2014-11-12 20:34 GMT-08:00 Andrew Or and...@databricks.com: Please vote on releasing the following candidate as Apache Spark version 1 .1.1. This release fixes a number of bugs in Spark 1.1.0. Some of the notable ones are - [SPARK-3426] Sort-based shuffle compression settings are incompatible - [SPARK-3948] Stream corruption issues in sort-based shuffle - [SPARK-4107] Incorrect handling of Channel.read() led to data truncation The full list is at http://s.apache.org/z9h and in the CHANGES.txt attached. The tag to be voted on is v1.1.1-rc1 (commit 72a4fdbe): http://s.apache.org/cZC The release files, including signatures, digests, etc can be found at: http://people.apache.org/~andrewor14/spark-1.1.1-rc1/ Release artifacts are signed with the following key: https://people.apache.org/keys/committer/andrewor14.asc The staging repository for this release can be found at: https://repository.apache.org/content/repositories/orgapachespark-1034/ The documentation corresponding to this release can be found at: http://people.apache.org/~andrewor14/spark-1.1.1-rc1-docs/ Please vote on releasing this package as Apache Spark 1.1.1! The vote is open until Sunday, November 16, at 04:30 UTC and passes if a majority of at least 3 +1 PMC votes are cast. [ ] +1 Release this package as Apache Spark 1.1.1 [ ] -1 Do not release this package because ... To learn more about Apache Spark, please see http://spark.apache.org/ Cheers, Andrew - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: Join operator in PySpark
We should implement this using cogroup(); it will just require some tracking to map Python partitioners into dummy Java ones so that Java Spark’s cogroup() operator respects Python’s partitioning. I’m sure that there are some other subtleties, particularly if we mix datasets that use different serializers / Java object representations. There’s a longstanding JIRA to fix this: https://issues.apache.org/jira/browse/SPARK-655 On November 13, 2014 at 5:08:15 AM, 夏俊鸾 (xiajunl...@gmail.com) wrote: Hi all I have noticed that “Join” operator has been transferred to union and groupByKey operator instead of cogroup operator in PySpark, this change will probably generate more shuffle stage, for example rdd1 = sc.makeRDD(...).partitionBy(2) rdd2 = sc.makeRDD(...).partitionBy(2) rdd3 = rdd1.join(rdd2).collect() Above code implemented with scala will generate 2 shuffle, but will generate 3 shuffle with python. what is initial design motivation of join operator in PySpark? Any idea to improve join performance in PySpark? Andrew
Re: [VOTE] Release Apache Spark 1.1.1 (RC1)
Ah right. This is because I'm running Java 8. This was fixed in SPARK-3329 (https://github.com/apache/spark/commit/2b7ab814f9bde65ebc57ebd04386e56c97f06f4a#diff-7bfd8d7c8cbb02aa0023e4c3497ee832). Consider back-porting it if other reasons arise, but this is specific to tests and to Java 8. On Thu, Nov 13, 2014 at 8:01 PM, Andrew Or and...@databricks.com wrote: Yeah, this seems to be somewhat environment specific too. The same test has been passing here for a while: https://amplab.cs.berkeley.edu/jenkins/job/Spark-1.1-Maven-pre-YARN/hadoop.version=1.0.4,label=centos/lastBuild/consoleFull 2014-11-13 11:26 GMT-08:00 Michael Armbrust mich...@databricks.com: Hey Sean, Thanks for pointing this out. Looks like a bad test where we should be doing Set comparison instead of Array. Michael On Thu, Nov 13, 2014 at 2:05 AM, Sean Owen so...@cloudera.com wrote: LICENSE and NOTICE are fine. Signature and checksum is fine. I unzipped and built the plain source distribution, which built. However I am seeing a consistent test failure with mvn -DskipTests clean package; mvn test. In the Hive module: - SET commands semantics for a HiveContext *** FAILED *** Expected Array(spark.sql.key.usedfortestonly=test.val.0, spark.sql.key.usedfortestonlyspark.sql.key.usedfortestonly=test.val.0test.val.0), but got Array(spark.sql.key.usedfortestonlyspark.sql.key.usedfortestonly=test.val.0test.val.0, spark.sql.key.usedfortestonly=test.val.0) (HiveQuerySuite.scala:544) Anyone else seeing this? On Thu, Nov 13, 2014 at 8:18 AM, Krishna Sankar ksanka...@gmail.com wrote: +1 1. Compiled OSX 10.10 (Yosemite) mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests clean package 10:49 min 2. Tested pyspark, mlib 2.1. statistics OK 2.2. Linear/Ridge/Laso Regression OK 2.3. Decision Tree, Naive Bayes OK 2.4. KMeans OK 2.5. rdd operations OK 2.6. recommendation OK 2.7. Good work ! In 1.1.0, there was an error and my program used to hang (over memory allocation) consistently running validation using itertools, compute optimum rank, lambda,numofiterations/rmse; data - movielens medium dataset (1 million records) . It works well in 1.1.1 ! Cheers k/ P.S: Missed Reply all, first time On Wed, Nov 12, 2014 at 8:35 PM, Andrew Or and...@databricks.com wrote: I will start the vote with a +1 2014-11-12 20:34 GMT-08:00 Andrew Or and...@databricks.com: Please vote on releasing the following candidate as Apache Spark version 1 .1.1. This release fixes a number of bugs in Spark 1.1.0. Some of the notable ones are - [SPARK-3426] Sort-based shuffle compression settings are incompatible - [SPARK-3948] Stream corruption issues in sort-based shuffle - [SPARK-4107] Incorrect handling of Channel.read() led to data truncation The full list is at http://s.apache.org/z9h and in the CHANGES.txt attached. The tag to be voted on is v1.1.1-rc1 (commit 72a4fdbe): http://s.apache.org/cZC The release files, including signatures, digests, etc can be found at: http://people.apache.org/~andrewor14/spark-1.1.1-rc1/ Release artifacts are signed with the following key: https://people.apache.org/keys/committer/andrewor14.asc The staging repository for this release can be found at: https://repository.apache.org/content/repositories/orgapachespark-1034/ The documentation corresponding to this release can be found at: http://people.apache.org/~andrewor14/spark-1.1.1-rc1-docs/ Please vote on releasing this package as Apache Spark 1.1.1! The vote is open until Sunday, November 16, at 04:30 UTC and passes if a majority of at least 3 +1 PMC votes are cast. [ ] +1 Release this package as Apache Spark 1.1.1 [ ] -1 Do not release this package because ... To learn more about Apache Spark, please see http://spark.apache.org/ Cheers, Andrew - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: Problems with spark.locality.wait
Hi Mridul, In the case Shivaram and I saw, and based on my understanding of Ma chong's description, I don't think that completely fixes the problem. To be very concrete, suppose your job has two tasks, t1 and t2, and they each have input data (in HDFS) on h1 and h2, respectively, and that h1 and h2 are on the same rack. Suppose your Spark job gets allocated two executors, one on h1 and another on h3 (a different host with no input data). When the job gets submitted to the task set manager (TSM), TSM.computeValidLocalityLevels will determine that the valid levels are NODE_LOCAL (because t1 could be run on the NODE_LOCAL executor on h1), RACK_LOCAL, ANY. As a result, the TSM will not schedule t2 until spark.locality.wait.NODE_LOCAL expires, even though t2 has no hope of being scheduled on a NODE_LOCAL machine (because the job wasn't given any executors on h2). You could set spark.locality.wait.NODE_LOCAL to be low, but then it might cause t1 (or more generally, in a larger job, other tasks that have NODE_LOCAL executors where they can be scheduled) to get scheduled on h3 (and not on h1). Is there a way you were thinking of configuring things that avoids this problem? I'm pretty sure we could fix this problem by tracking more information about each task in the TSM -- for example, the TSM has enough information to know that there are no NODE_LOCAL executors where t2 could be scheduled in the above example (and that the best possible locality level for t2 is RACK_LOCAL), and could schedule t2 right away on a RACK_LOCAL machine. Of course, this would add a bunch of complexity to the TSM, hence the earlier decision that the added complexity may not be worth it. -Kay On Thu, Nov 13, 2014 at 12:11 PM, Mridul Muralidharan mri...@gmail.com wrote: Instead of setting spark.locality.wait, try setting individual locality waits specifically. Namely, spark.locality.wait.PROCESS_LOCAL to high value (so that process local tasks are always scheduled in case the task set has process local tasks). Set spark.locality.wait.NODE_LOCAL and spark.locality.wait.RACK_LOCAL to low value - so that in case task set has no process local tasks, both node local and rack local tasks are scheduled asap. From your description, this will alleviate the problem you mentioned. Kay's comment, IMO, is slightly general in nature - and I suspect unless we overhaul how preferred locality is specified, and allow for taskset specific hints for schedule, we cant resolve that IMO. Regards, Mridul On Thu, Nov 13, 2014 at 1:25 PM, MaChong machon...@sina.com wrote: Hi, We are running a time sensitive application with 70 partition and 800MB each parition size. The application first load data from database in different cluster, then apply a filter, cache the filted data, then apply a map and a reduce, finally collect results. The application will be finished in 20 seconds if we set spark.locality.wait to a large value (30 minutes). And it will use 100 seconds, if we set spark.locality.wait a small value(less than 10 seconds) We have analysed the driver log and found lot of NODE_LOCAL and RACK_LOCAL level tasks, normally a PROCESS_LOCAL task only takes 15 seconds, but NODE_LOCAL or RACK_LOCAL tasks will take 70 seconds. So I think we'd better set spark.locality.wait to a large value(30 minutes), until we meet this problem: Now our application will load data from hdfs in the same spark cluster, it will get NODE_LOCAL and RACK_LOCAL level tasks during loading stage, if the tasks in loading stage have same locality level, ether NODE_LOCAL or RACK_LOCAL it works fine. But if the tasks in loading stage get mixed locality level, such as 3 NODE_LOCAL tasks, and 2 RACK_LOCAL tasks, then the TaskSetManager of loading stage will submit the 3 NODE_LOCAL tasks as soon as resources were offered, then wait for spark.locality.wait.node, which was setted to 30 minutes, the 2 RACK_LOCAL tasks will wait 30 minutes even though resources are avaliable. Does any one have met this problem? Do you have a nice solution? Thanks Ma chong - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
TimSort in 1.2
Hi, I am noticing the first step for Spark jobs does a TimSort in 1.2 branch...and there is some time spent doing the TimSort...Is this assigning the RDD blocks to different nodes based on a sort order ? Could someone please point to a JIRA about this change so that I can read more about it ? Thanks. Deb
Re: Problems with spark.locality.wait
This sounds like it may be exactly the problem we've been having (and about which I recently posted on the user list). Is there any way of monitoring it's attempts to wait, giving up, and trying another level? In general, I'm trying to figure out why we can have repeated identical jobs, the first of which will have all PROCESS_LOCAL, and the next will have 95% PROCESS_LOCAL, and 5% ANY. On Thu, Nov 13, 2014 at 2:20 PM, Kay Ousterhout k...@eecs.berkeley.edu wrote: Hi, Shivaram and I stumbled across this problem a few weeks ago, and AFAIK there is no nice solution. We worked around it by avoiding jobs with tasks that have tasks with two locality levels. To fix this problem, we really need to fix the underlying problem in the scheduling code, which currently tries to schedule all tasks at the minimum locality of any of the tasks in the job. Unfortunately, this involves adding a bunch of complexity to the scheduling code. Patrick had previously convinced us that we were the only ones running into this problem, so it wasn't worth fixing (especially because we found an OK workaround for our case). It would be useful to hear if any other folks have run into this problem -- it sounds like maybe we should go ahead and fix the scheduling code. I've filed a JIRA to track this: https://issues.apache.org/jira/browse/SPARK-4383 -Kay On Wed, Nov 12, 2014 at 11:55 PM, MaChong machon...@sina.com wrote: Hi, We are running a time sensitive application with 70 partition and 800MB each parition size. The application first load data from database in different cluster, then apply a filter, cache the filted data, then apply a map and a reduce, finally collect results. The application will be finished in 20 seconds if we set spark.locality.wait to a large value (30 minutes). And it will use 100 seconds, if we set spark.locality.wait a small value(less than 10 seconds) We have analysed the driver log and found lot of NODE_LOCAL and RACK_LOCAL level tasks, normally a PROCESS_LOCAL task only takes 15 seconds, but NODE_LOCAL or RACK_LOCAL tasks will take 70 seconds. So I think we'd better set spark.locality.wait to a large value(30 minutes), until we meet this problem: Now our application will load data from hdfs in the same spark cluster, it will get NODE_LOCAL and RACK_LOCAL level tasks during loading stage, if the tasks in loading stage have same locality level, ether NODE_LOCAL or RACK_LOCAL it works fine. But if the tasks in loading stage get mixed locality level, such as 3 NODE_LOCAL tasks, and 2 RACK_LOCAL tasks, then the TaskSetManager of loading stage will submit the 3 NODE_LOCAL tasks as soon as resources were offered, then wait for spark.locality.wait.node, which was setted to 30 minutes, the 2 RACK_LOCAL tasks will wait 30 minutes even though resources are avaliable. Does any one have met this problem? Do you have a nice solution? Thanks Ma chong -- Nathan Kronenfeld Senior Visualization Developer Oculus Info Inc 2 Berkeley Street, Suite 600, Toronto, Ontario M5A 4J5 Phone: +1-416-203-3003 x 238 Email: nkronenf...@oculusinfo.com
Re: About implicit rddToPairRDDFunctions
If we put the `implicit` into pacakge object rdd or object rdd, when we write `rdd.groupbykey()`, because rdd is an object of RDD, Scala compiler will search `object rdd`(companion object) and `package object rdd`(pacakge object) by default. We don't need to import them explicitly. Here is a post about the implicit search logic: http://eed3si9n.com/revisiting-implicits-without-import-tax To maintain the compatibility, we can keep `rddToPairRDDFunctions` in the SparkContext but remove `implicit`. The disadvantage is there are two copies of same codes. Best Regards, Shixiong Zhu 2014-11-14 3:57 GMT+08:00 Reynold Xin r...@databricks.com: Do people usually important o.a.spark.rdd._ ? Also in order to maintain source and binary compatibility, we would need to keep both right? On Thu, Nov 6, 2014 at 3:12 AM, Shixiong Zhu zsxw...@gmail.com wrote: I saw many people asked how to convert a RDD to a PairRDDFunctions. I would like to ask a question about it. Why not put the following implicit into pacakge object rdd or object rdd? implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)]) (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null) = { new PairRDDFunctions(rdd) } If so, the converting will be automatic and not need to import org.apache.spark.SparkContext._ I tried to search some discussion but found nothing. Best Regards, Shixiong Zhu
Re: Cache sparkSql data without uncompressing it in memory
No, the columnar buffer is built in a small batching manner, the batch size is controlled by the |spark.sql.inMemoryColumnarStorage.batchSize| property. The default value for this in master and branch-1.2 is 10,000 rows per batch. On 11/14/14 1:27 AM, Sadhan Sood wrote: Thanks Chneg, Just one more question - does that mean that we still need enough memory in the cluster to uncompress the data before it can be compressed again or does that just read the raw data as is? On Wed, Nov 12, 2014 at 10:05 PM, Cheng Lian lian.cs@gmail.com mailto:lian.cs@gmail.com wrote: Currently there’s no way to cache the compressed sequence file directly. Spark SQL uses in-memory columnar format while caching table rows, so we must read all the raw data and convert them into columnar format. However, you can enable in-memory columnar compression by setting |spark.sql.inMemoryColumnarStorage.compressed| to |true|. This property is already set to true by default in master branch and branch-1.2. On 11/13/14 7:16 AM, Sadhan Sood wrote: We noticed while caching data from our hive tables which contain data in compressed sequence file format that it gets uncompressed in memory when getting cached. Is there a way to turn this off and cache the compressed data as is ?
Re: Re: Problems with spark.locality.wait
In the specific example stated, the user had two taskset if I understood right ... the first taskset reads off db (dfs in your example), and does some filter, etc and caches it. Second which works off the cached data (which is, now, process local locality level aware) to do map, group, etc. The taskset(s) which work off the cached data would be sensitive to PROCESS_LOCAL locality level. But for the initial taskset (which loaded off hdfs/database, etc) no tasks can be process local - since we do not have a way to specify that in spark (which, imo, is a limitation). Given this, the requirement seemed to be to relax locality level for initial load taskset - since not scheduling on rack local or other nodes seems to be hurting utilization and latency when no node local executors are available. But for tasksets which have process local tasks, user wants to ensure that node/rack local schedule does not happen (based on the timeouts and perf numbers). Hence my suggestion on setting the individual locality level timeouts - ofcourse, my suggestion was highly specific to the problem as stated :-) It is, by no means, a generalization - and I do agree we definitely do need to address the larger scheduling issue. Regards, Mridul On Fri, Nov 14, 2014 at 2:05 AM, Kay Ousterhout k...@eecs.berkeley.edu wrote: Hi Mridul, In the case Shivaram and I saw, and based on my understanding of Ma chong's description, I don't think that completely fixes the problem. To be very concrete, suppose your job has two tasks, t1 and t2, and they each have input data (in HDFS) on h1 and h2, respectively, and that h1 and h2 are on the same rack. Suppose your Spark job gets allocated two executors, one on h1 and another on h3 (a different host with no input data). When the job gets submitted to the task set manager (TSM), TSM.computeValidLocalityLevels will determine that the valid levels are NODE_LOCAL (because t1 could be run on the NODE_LOCAL executor on h1), RACK_LOCAL, ANY. As a result, the TSM will not schedule t2 until spark.locality.wait.NODE_LOCAL expires, even though t2 has no hope of being scheduled on a NODE_LOCAL machine (because the job wasn't given any executors on h2). You could set spark.locality.wait.NODE_LOCAL to be low, but then it might cause t1 (or more generally, in a larger job, other tasks that have NODE_LOCAL executors where they can be scheduled) to get scheduled on h3 (and not on h1). Is there a way you were thinking of configuring things that avoids this problem? I'm pretty sure we could fix this problem by tracking more information about each task in the TSM -- for example, the TSM has enough information to know that there are no NODE_LOCAL executors where t2 could be scheduled in the above example (and that the best possible locality level for t2 is RACK_LOCAL), and could schedule t2 right away on a RACK_LOCAL machine. Of course, this would add a bunch of complexity to the TSM, hence the earlier decision that the added complexity may not be worth it. -Kay On Thu, Nov 13, 2014 at 12:11 PM, Mridul Muralidharan mri...@gmail.com wrote: Instead of setting spark.locality.wait, try setting individual locality waits specifically. Namely, spark.locality.wait.PROCESS_LOCAL to high value (so that process local tasks are always scheduled in case the task set has process local tasks). Set spark.locality.wait.NODE_LOCAL and spark.locality.wait.RACK_LOCAL to low value - so that in case task set has no process local tasks, both node local and rack local tasks are scheduled asap. From your description, this will alleviate the problem you mentioned. Kay's comment, IMO, is slightly general in nature - and I suspect unless we overhaul how preferred locality is specified, and allow for taskset specific hints for schedule, we cant resolve that IMO. Regards, Mridul On Thu, Nov 13, 2014 at 1:25 PM, MaChong machon...@sina.com wrote: Hi, We are running a time sensitive application with 70 partition and 800MB each parition size. The application first load data from database in different cluster, then apply a filter, cache the filted data, then apply a map and a reduce, finally collect results. The application will be finished in 20 seconds if we set spark.locality.wait to a large value (30 minutes). And it will use 100 seconds, if we set spark.locality.wait a small value(less than 10 seconds) We have analysed the driver log and found lot of NODE_LOCAL and RACK_LOCAL level tasks, normally a PROCESS_LOCAL task only takes 15 seconds, but NODE_LOCAL or RACK_LOCAL tasks will take 70 seconds. So I think we'd better set spark.locality.wait to a large value(30 minutes), until we meet this problem: Now our application will load data from hdfs in the same spark cluster, it will get NODE_LOCAL and RACK_LOCAL level tasks during loading stage, if the tasks in loading stage have same locality level, ether NODE_LOCAL or RACK_LOCAL it works
Re: [MLlib] Contributing Algorithm for Outlier Detection
Hi Ashutosh, Please edit the README file.I think the following function call is changed now. |model = OutlierWithAVFModel.outliers(master:String, input dir:String , percentage:Double||) | Regards, *Meethu Mathew* *Engineer* *Flytxt* _http://www.linkedin.com/home?trk=hb_tab_home_top_ On Friday 14 November 2014 12:01 AM, Ashutosh wrote: Hi Anant, Please see the changes. https://github.com/codeAshu/Outlier-Detection-with-AVF-Spark/blob/master/OutlierWithAVFModel.scala I have changed the input format to Vector of String. I think we can also make it generic. Line 59 72 : that counter will not affect in parallelism, Since it only work on one datapoint. It only does the Indexing of the column. Rest all side effects have been removed. Thanks, Ashutosh From: slcclimber [via Apache Spark Developers List] ml-node+s1001551n9287...@n3.nabble.com Sent: Tuesday, November 11, 2014 11:46 PM To: Ashutosh Trivedi (MT2013030) Subject: Re: [MLlib] Contributing Algorithm for Outlier Detection Mayur, Libsvm format sounds good to me. I could work on writing the tests if that helps you? Anant On Nov 11, 2014 11:06 AM, Ashutosh [via Apache Spark Developers List] [hidden email]/user/SendEmail.jtp?type=nodenode=9287i=0 wrote: Hi Mayur, Vector data types are implemented using breeze library, it is presented at .../org/apache/spark/mllib/linalg Anant, One restriction I found that a vector can only be of 'Double', so it actually restrict the user. What are you thoughts on LibSVM format? Thanks for the comments, I was just trying to get away from those increment /decrement functions, they look ugly. Points are noted. I'll try to fix them soon. Tests are also required for the code. Regards, Ashutosh From: Mayur Rustagi [via Apache Spark Developers List] ml-node+[hidden email]http://user/SendEmail.jtp?type=nodenode=9286i=0 Sent: Saturday, November 8, 2014 12:52 PM To: Ashutosh Trivedi (MT2013030) Subject: Re: [MLlib] Contributing Algorithm for Outlier Detection We should take a vector instead giving the user flexibility to decide data source/ type What do you mean by vector datatype exactly? Mayur Rustagi Ph: a href=tel:%2B1%20%28760%29%20203%203257 value=+17602033257 target=_blank+1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Wed, Nov 5, 2014 at 6:45 AM, slcclimber [hidden email]http://user/SendEmail.jtp?type=nodenode=9239i=0 wrote: Ashutosh, I still see a few issues. 1. On line 112 you are counting using a counter. Since this will happen in a RDD the counter will cause issues. Also that is not good functional style to use a filter function with a side effect. You could use randomSplit instead. This does not the same thing without the side effect. 2. Similar shared usage of j in line 102 is going to be an issue as well. also hash seed does not need to be sequential it could be randomly generated or hashed on the values. 3. The compute function and trim scores still runs on a comma separeated RDD. We should take a vector instead giving the user flexibility to decide data source/ type. what if we want data from hive tables or parquet or JSON or avro formats. This is a very restrictive format. With vectors the user has the choice of taking in whatever data format and converting them to vectors insteda of reading json files creating a csv file and then workig on that. 4. Similar use of counters in 54 and 65 is an issue. Basically the shared state counters is a huge issue that does not scale. Since the processing of RDD's is distributed and the value j lives on the master. Anant On Tue, Nov 4, 2014 at 7:22 AM, Ashutosh [via Apache Spark Developers List] [hidden email]http://user/SendEmail.jtp?type=nodenode=9239i=1 wrote: Anant, I got rid of those increment/ decrements functions and now code is much cleaner. Please check. All your comments have been looked after. https://github.com/codeAshu/Outlier-Detection-with-AVF-Spark/blob/master/OutlierWithAVFModel.scala _Ashu https://github.com/codeAshu/Outlier-Detection-with-AVF-Spark/blob/master/OutlierWithAVFModel.scala Outlier-Detection-with-AVF-Spark/OutlierWithAVFModel.scala at master · codeAshu/Outlier-Detection-with-AVF-Spark · GitHub Contribute to Outlier-Detection-with-AVF-Spark development by creating an account on GitHub. Read more... https://github.com/codeAshu/Outlier-Detection-with-AVF-Spark/blob/master/OutlierWithAVFModel.scala -- *From:* slcclimber [via Apache Spark Developers List] ml-node+[hidden email] http://user/SendEmail.jtp?type=nodenode=9083i=0 *Sent:* Friday, October 31, 2014 10:09 AM *To:* Ashutosh Trivedi (MT2013030) *Subject:* Re: [MLlib] Contributing Algorithm for Outlier Detection You should create a jira ticket to go with it as well. Thanks On Oct 30, 2014 10:38 PM, Ashutosh [via Apache Spark
Re: About implicit rddToPairRDDFunctions
That seems like a great idea. Can you submit a pull request? On Thu, Nov 13, 2014 at 7:13 PM, Shixiong Zhu zsxw...@gmail.com wrote: If we put the `implicit` into pacakge object rdd or object rdd, when we write `rdd.groupbykey()`, because rdd is an object of RDD, Scala compiler will search `object rdd`(companion object) and `package object rdd`(pacakge object) by default. We don't need to import them explicitly. Here is a post about the implicit search logic: http://eed3si9n.com/revisiting-implicits-without-import-tax To maintain the compatibility, we can keep `rddToPairRDDFunctions` in the SparkContext but remove `implicit`. The disadvantage is there are two copies of same codes. Best Regards, Shixiong Zhu 2014-11-14 3:57 GMT+08:00 Reynold Xin r...@databricks.com: Do people usually important o.a.spark.rdd._ ? Also in order to maintain source and binary compatibility, we would need to keep both right? On Thu, Nov 6, 2014 at 3:12 AM, Shixiong Zhu zsxw...@gmail.com wrote: I saw many people asked how to convert a RDD to a PairRDDFunctions. I would like to ask a question about it. Why not put the following implicit into pacakge object rdd or object rdd? implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)]) (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null) = { new PairRDDFunctions(rdd) } If so, the converting will be automatic and not need to import org.apache.spark.SparkContext._ I tried to search some discussion but found nothing. Best Regards, Shixiong Zhu
Re: About implicit rddToPairRDDFunctions
OK. I'll take it. Best Regards, Shixiong Zhu 2014-11-14 12:34 GMT+08:00 Reynold Xin r...@databricks.com: That seems like a great idea. Can you submit a pull request? On Thu, Nov 13, 2014 at 7:13 PM, Shixiong Zhu zsxw...@gmail.com wrote: If we put the `implicit` into pacakge object rdd or object rdd, when we write `rdd.groupbykey()`, because rdd is an object of RDD, Scala compiler will search `object rdd`(companion object) and `package object rdd`(pacakge object) by default. We don't need to import them explicitly. Here is a post about the implicit search logic: http://eed3si9n.com/revisiting-implicits-without-import-tax To maintain the compatibility, we can keep `rddToPairRDDFunctions` in the SparkContext but remove `implicit`. The disadvantage is there are two copies of same codes. Best Regards, Shixiong Zhu 2014-11-14 3:57 GMT+08:00 Reynold Xin r...@databricks.com: Do people usually important o.a.spark.rdd._ ? Also in order to maintain source and binary compatibility, we would need to keep both right? On Thu, Nov 6, 2014 at 3:12 AM, Shixiong Zhu zsxw...@gmail.com wrote: I saw many people asked how to convert a RDD to a PairRDDFunctions. I would like to ask a question about it. Why not put the following implicit into pacakge object rdd or object rdd? implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)]) (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null) = { new PairRDDFunctions(rdd) } If so, the converting will be automatic and not need to import org.apache.spark.SparkContext._ I tried to search some discussion but found nothing. Best Regards, Shixiong Zhu
Re: [MLlib] Contributing Algorithm for Outlier Detection
Hi, I have a doubt regarding the input to your algorithm. _http://www.linkedin.com/home?trk=hb_tab_home_top_ val model = OutlierWithAVFModel.outliers(data :RDD[Vector[String]], percent : Double, sc :SparkContext) Here our input data is an RDD[Vector[String]]. How we can create this RDD from a file? sc.textFile will simply give us an RDD, how to make it a Vector[String]? Could you plz share any code snippet of this conversion if you have.. Regards, Meethu Mathew On Friday 14 November 2014 10:02 AM, Meethu Mathew wrote: Hi Ashutosh, Please edit the README file.I think the following function call is changed now. |model = OutlierWithAVFModel.outliers(master:String, input dir:String , percentage:Double||) | Regards, *Meethu Mathew* *Engineer* *Flytxt* _http://www.linkedin.com/home?trk=hb_tab_home_top_ On Friday 14 November 2014 12:01 AM, Ashutosh wrote: Hi Anant, Please see the changes. https://github.com/codeAshu/Outlier-Detection-with-AVF-Spark/blob/master/OutlierWithAVFModel.scala I have changed the input format to Vector of String. I think we can also make it generic. Line 59 72 : that counter will not affect in parallelism, Since it only work on one datapoint. It only does the Indexing of the column. Rest all side effects have been removed. Thanks, Ashutosh From: slcclimber [via Apache Spark Developers List] ml-node+s1001551n9287...@n3.nabble.com Sent: Tuesday, November 11, 2014 11:46 PM To: Ashutosh Trivedi (MT2013030) Subject: Re: [MLlib] Contributing Algorithm for Outlier Detection Mayur, Libsvm format sounds good to me. I could work on writing the tests if that helps you? Anant On Nov 11, 2014 11:06 AM, Ashutosh [via Apache Spark Developers List] [hidden email]/user/SendEmail.jtp?type=nodenode=9287i=0 wrote: Hi Mayur, Vector data types are implemented using breeze library, it is presented at .../org/apache/spark/mllib/linalg Anant, One restriction I found that a vector can only be of 'Double', so it actually restrict the user. What are you thoughts on LibSVM format? Thanks for the comments, I was just trying to get away from those increment /decrement functions, they look ugly. Points are noted. I'll try to fix them soon. Tests are also required for the code. Regards, Ashutosh From: Mayur Rustagi [via Apache Spark Developers List] ml-node+[hidden email]http://user/SendEmail.jtp?type=nodenode=9286i=0 Sent: Saturday, November 8, 2014 12:52 PM To: Ashutosh Trivedi (MT2013030) Subject: Re: [MLlib] Contributing Algorithm for Outlier Detection We should take a vector instead giving the user flexibility to decide data source/ type What do you mean by vector datatype exactly? Mayur Rustagi Ph: a href=tel:%2B1%20%28760%29%20203%203257 value=+17602033257 target=_blank+1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Wed, Nov 5, 2014 at 6:45 AM, slcclimber [hidden email]http://user/SendEmail.jtp?type=nodenode=9239i=0 wrote: Ashutosh, I still see a few issues. 1. On line 112 you are counting using a counter. Since this will happen in a RDD the counter will cause issues. Also that is not good functional style to use a filter function with a side effect. You could use randomSplit instead. This does not the same thing without the side effect. 2. Similar shared usage of j in line 102 is going to be an issue as well. also hash seed does not need to be sequential it could be randomly generated or hashed on the values. 3. The compute function and trim scores still runs on a comma separeated RDD. We should take a vector instead giving the user flexibility to decide data source/ type. what if we want data from hive tables or parquet or JSON or avro formats. This is a very restrictive format. With vectors the user has the choice of taking in whatever data format and converting them to vectors insteda of reading json files creating a csv file and then workig on that. 4. Similar use of counters in 54 and 65 is an issue. Basically the shared state counters is a huge issue that does not scale. Since the processing of RDD's is distributed and the value j lives on the master. Anant On Tue, Nov 4, 2014 at 7:22 AM, Ashutosh [via Apache Spark Developers List] [hidden email]http://user/SendEmail.jtp?type=nodenode=9239i=1 wrote: Anant, I got rid of those increment/ decrements functions and now code is much cleaner. Please check. All your comments have been looked after. https://github.com/codeAshu/Outlier-Detection-with-AVF-Spark/blob/master/OutlierWithAVFModel.scala _Ashu https://github.com/codeAshu/Outlier-Detection-with-AVF-Spark/blob/master/OutlierWithAVFModel.scala Outlier-Detection-with-AVF-Spark/OutlierWithAVFModel.scala at master · codeAshu/Outlier-Detection-with-AVF-Spark · GitHub Contribute to Outlier-Detection-with-AVF-Spark development by creating an account on GitHub.
Re: [MLlib] Contributing Algorithm for Outlier Detection
Please use the following snippet. I am still working on to make it a generic vector, so that input should not Vector[String] always. But String will work fine for now. def main(args:Array[String]) { val sc = new SparkContext(local, OutlierDetection) val dir = hdfs://localhost:54310/train3 your file path val data = sc.textFile(dir).map(word = word.split(,).toVector) val model = OutlierWithAVFModel.outliers(data,20,sc) model.score.saveAsTextFile(../scores) model.trimmed_data.saveAsTextFile(.../trimmed) } From: Meethu Mathew-2 [via Apache Spark Developers List] ml-node+s1001551n9352...@n3.nabble.com Sent: Friday, November 14, 2014 11:42 AM To: Ashutosh Trivedi (MT2013030) Subject: Re: [MLlib] Contributing Algorithm for Outlier Detection Hi, I have a doubt regarding the input to your algorithm. _http://www.linkedin.com/home?trk=hb_tab_home_top_ val model = OutlierWithAVFModel.outliers(data :RDD[Vector[String]], percent : Double, sc :SparkContext) Here our input data is an RDD[Vector[String]]. How we can create this RDD from a file? sc.textFile will simply give us an RDD, how to make it a Vector[String]? Could you plz share any code snippet of this conversion if you have.. Regards, Meethu Mathew On Friday 14 November 2014 10:02 AM, Meethu Mathew wrote: Hi Ashutosh, Please edit the README file.I think the following function call is changed now. |model = OutlierWithAVFModel.outliers(master:String, input dir:String , percentage:Double||) | Regards, *Meethu Mathew* *Engineer* *Flytxt* _http://www.linkedin.com/home?trk=hb_tab_home_top_ On Friday 14 November 2014 12:01 AM, Ashutosh wrote: Hi Anant, Please see the changes. https://github.com/codeAshu/Outlier-Detection-with-AVF-Spark/blob/master/OutlierWithAVFModel.scala I have changed the input format to Vector of String. I think we can also make it generic. Line 59 72 : that counter will not affect in parallelism, Since it only work on one datapoint. It only does the Indexing of the column. Rest all side effects have been removed. Thanks, Ashutosh From: slcclimber [via Apache Spark Developers List] [hidden email]/user/SendEmail.jtp?type=nodenode=9352i=0 Sent: Tuesday, November 11, 2014 11:46 PM To: Ashutosh Trivedi (MT2013030) Subject: Re: [MLlib] Contributing Algorithm for Outlier Detection Mayur, Libsvm format sounds good to me. I could work on writing the tests if that helps you? Anant On Nov 11, 2014 11:06 AM, Ashutosh [via Apache Spark Developers List] [hidden email]/user/SendEmail.jtp?type=nodenode=9287i=0 wrote: Hi Mayur, Vector data types are implemented using breeze library, it is presented at .../org/apache/spark/mllib/linalg Anant, One restriction I found that a vector can only be of 'Double', so it actually restrict the user. What are you thoughts on LibSVM format? Thanks for the comments, I was just trying to get away from those increment /decrement functions, they look ugly. Points are noted. I'll try to fix them soon. Tests are also required for the code. Regards, Ashutosh From: Mayur Rustagi [via Apache Spark Developers List] ml-node+[hidden email]http://user/SendEmail.jtp?type=nodenode=9286i=0 Sent: Saturday, November 8, 2014 12:52 PM To: Ashutosh Trivedi (MT2013030) Subject: Re: [MLlib] Contributing Algorithm for Outlier Detection We should take a vector instead giving the user flexibility to decide data source/ type What do you mean by vector datatype exactly? Mayur Rustagi Ph: a href=tel:%2B1%20%28760%29%20203%203257 value=+17602033257 target=_blank+1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Wed, Nov 5, 2014 at 6:45 AM, slcclimber [hidden email]http://user/SendEmail.jtp?type=nodenode=9239i=0 wrote: Ashutosh, I still see a few issues. 1. On line 112 you are counting using a counter. Since this will happen in a RDD the counter will cause issues. Also that is not good functional style to use a filter function with a side effect. You could use randomSplit instead. This does not the same thing without the side effect. 2. Similar shared usage of j in line 102 is going to be an issue as well. also hash seed does not need to be sequential it could be randomly generated or hashed on the values. 3. The compute function and trim scores still runs on a comma separeated RDD. We should take a vector instead giving the user flexibility to decide data source/ type. what if we want data from hive tables or parquet or JSON or avro formats. This is a very restrictive format. With vectors the user has the choice of taking in whatever data format and converting them to vectors insteda of reading json files creating a csv file and then workig on that. 4. Similar use of counters in 54 and 65 is an issue.