Re: What's the advantage features of Spark SQL(JDBC)
@Hao,As you said, there is no advantage feature for JDBC, it just provides unified api to support different data sources. Is it right? On Friday, May 15, 2015 2:46 PM, Cheng, Hao hao.ch...@intel.com wrote: #yiv2822675239 #yiv2822675239 -- _filtered #yiv2822675239 {font-family:Helvetica;panose-1:2 11 6 4 2 2 2 2 2 4;} _filtered #yiv2822675239 {font-family:宋体;panose-1:2 1 6 0 3 1 1 1 1 1;} _filtered #yiv2822675239 {panose-1:2 4 5 3 5 4 6 3 2 4;} _filtered #yiv2822675239 {font-family:Calibri;panose-1:2 15 5 2 2 2 4 3 2 4;} _filtered #yiv2822675239 {panose-1:2 1 6 0 3 1 1 1 1 1;}#yiv2822675239 #yiv2822675239 p.yiv2822675239MsoNormal, #yiv2822675239 li.yiv2822675239MsoNormal, #yiv2822675239 div.yiv2822675239MsoNormal {margin:0cm;margin-bottom:.0001pt;font-size:12.0pt;}#yiv2822675239 a:link, #yiv2822675239 span.yiv2822675239MsoHyperlink {color:#0563C1;text-decoration:underline;}#yiv2822675239 a:visited, #yiv2822675239 span.yiv2822675239MsoHyperlinkFollowed {color:#954F72;text-decoration:underline;}#yiv2822675239 span.yiv2822675239EmailStyle17 {color:#1F497D;}#yiv2822675239 .yiv2822675239MsoChpDefault {font-size:10.0pt;} _filtered #yiv2822675239 {margin:72.0pt 90.0pt 72.0pt 90.0pt;}#yiv2822675239 div.yiv2822675239WordSection1 {}#yiv2822675239 Spark SQL just take the JDBC as a new data source, the same as we need to support loading data from a .csv or .json. From: Yi Zhang [mailto:zhangy...@yahoo.com.INVALID] Sent: Friday, May 15, 2015 2:30 PM To: User Subject: What's the advantage features of Spark SQL(JDBC) Hi All, Comparing direct access via JDBC, what's the advantage features of Spark SQL(JDBC) to access external data source? Any tips are welcome! Thanks. Regards, Yi
Re: Spark on Mesos vs Yarn
-BEGIN PGP SIGNED MESSAGE- Hash: SHA1 Hi Tim, Thanks for such a detailed email. I am excited to hear about the new features, I had a pull request going for adding attribute based filtering in the mesos scheduler but it hasn't received much love - https://github.com/apache/spark/pull/5563 . I am a fan of mesos/marathon/mesosphere and spark ecosystems and trying to push adoption at my workplace. It would love to see documentation, tutorials (anything actually) that would make mesos + spark a better and more fleshed out solution. Would it be possible for you to share some links to the JIRA and pull requests so that I can keep track on the progress/features. Again, thanks for replying. - -- Ankur Chauhan On 15/05/2015 00:39, Tim Chen wrote: Hi Ankur, This is a great question as I've heard similar concerns about Spark on Mesos. At the time when I started to contribute to Spark on Mesos approx half year ago, the Mesos scheduler and related code hasn't really got much attention from anyone and it was pretty much in maintenance mode. As a Mesos PMC that is really interested in Spark I started to refactor and check out different JIRAs and PRs around the Mesos scheduler, and after that started to fix various bugs in Spark, added documentation and also in fix related Mesos issues as well. Just recently for 1.4 we've merged in Cluster mode and Docker support, and there are also pending PRs around framework authentication, multi-role support, dynamic allocation, more finer tuned coarse grain mode scheduling configurations, etc. And finally just want to mention that Mesosphere and Typesafe is collaborating to bring a certified distribution (https://databricks.com/spark/certification/certified-spark-distributi on) of Spark on Mesos and DCOS, and we will be pouring resources into not just maintain Spark on Mesos but drive more features into the Mesos scheduler and also in Mesos so stateful services can leverage new APIs and features to make better scheduling decisions and optimizations. I don't have a solidified roadmap to share yet, but we will be discussing this and hopefully can share with the community soon. In summary Spark on Mesos is not dead or in maintenance mode, and look forward to see a lot more changes from us and the community. Tim On Thu, May 14, 2015 at 11:30 PM, Ankur Chauhan an...@malloc64.com mailto:an...@malloc64.com wrote: Hi, This is both a survey type as well as a roadmap query question. It seems like of the cluster options to run spark (i.e. via YARN and Mesos), YARN seems to be getting a lot more attention and patches when compared to Mesos. Would it be correct to assume that spark on mesos is more or less a dead or something like a maintenance-only feature and YARN is the recommended way to go? What is the roadmap for spark on mesos? and what is the roadmap for spark on yarn. I like mesos so as much as I would like to see it thrive I don't think spark community is active (or maybe it just appears that way). Another more community oriented question: what do most people use to run spark in production or more-than-POC products? Why did you make that decision? There was a similar post form early 2014 where Metei answered that mesos and yarn were equally important, but has this changed as spark has now reached almost 1.4.0 stage? -- Ankur Chauhan - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org mailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org mailto:user-h...@spark.apache.org -BEGIN PGP SIGNATURE- iQEcBAEBAgAGBQJVVaXvAAoJEOSJAMhvLp3LzkIH/RLPVUoOcFo0Gij1NpZDszN4 xWvesbOZszuqD8H1Dhyndz4RQKnrodyCE+NycFB+utd9epmuyGemmHpTnq18Gek6 PR5jqmgza94dOy0rfuIVvba14ALZb4tO9SgkjyGujrpMlFYvxTjBYdYCAjfEOTx7 A/vqaCzPSBRBmO8gWx07GWa4zI70qBSZ9KnV7dgtqfUUgPKdF4NnMZWRJjTO9Bp8 tTmWMldqYPqI95wdeeqTGMH0XT6JAKAiCskf62DGadRBsOshrhmh5mAQzUFwoTpA w4uZ+qMrTsblBvOf9z++v0eY8VBiQpOyXfOBiYiCNRtSsGa0KvqwgF1S/yLeRs0= =4Aax -END PGP SIGNATURE- - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark performance in cluster mode using yarn
Hi Ayan, I am asking general scenarios as per given info/configuration, from experts, not specific, java code is nothing get hive context and select query, there is no serialization or any other complex things I kept,straight forward, 10 lines of code, Group Please suggest if any Idea, Regards Sachin On Fri, May 15, 2015 at 6:57 AM, ayan guha guha.a...@gmail.com wrote: With this information it is hard to predict. What's the performance you are getting? What's your desired performance? Maybe you can post your code and experts can suggests improvement? On 14 May 2015 15:02, sachin Singh sachin.sha...@gmail.com wrote: Hi Friends, please someone can give the idea, Ideally what should be time(complete job execution) for spark job, I have data in a hive table, amount of data would be 1GB , 2 lacs rows for whole month, I want to do monthly aggregation, using SQL queries,groupby I have only one node,1 cluster,below configuration for running job, --num-executors 2 --driver-memory 3g --driver-java-options -XX:MaxPermSize=1G --executor-memory 2g --executor-cores 2 how much approximate time require to finish the job, or can someone suggest the best way to get quickly results, Thanks in advance, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-performance-in-cluster-mode-using-yarn-tp22877.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: What's the advantage features of Spark SQL(JDBC)
Spark SQL just take the JDBC as a new data source, the same as we need to support loading data from a .csv or .json. From: Yi Zhang [mailto:zhangy...@yahoo.com.INVALID] Sent: Friday, May 15, 2015 2:30 PM To: User Subject: What's the advantage features of Spark SQL(JDBC) Hi All, Comparing direct access via JDBC, what's the advantage features of Spark SQL(JDBC) to access external data source? Any tips are welcome! Thanks. Regards, Yi
回复:Re: how to delete data from table in sparksql
got it,thank you. Thanksamp;Best regards! San.Luo - 原始邮件 - 发件人:Michael Armbrust mich...@databricks.com 收件人:Denny Lee denny.g@gmail.com 抄送人:罗辉 luohui20...@sina.com, user user@spark.apache.org 主题:Re: how to delete data from table in sparksql 日期:2015年05月15日 01点49分 The list of unsupported hive features should mention that it implicitly includes features added after Hive 13. You cannot yet compile with Hive 13, though we are investigating this for 1.5 On Thu, May 14, 2015 at 6:40 AM, Denny Lee denny.g@gmail.com wrote: Delete from table is available as part of Hive 0.14 (reference: Apache Hive Language Manual DML - Delete) while Spark 1.3 defaults to Hive 0.13.Perhaps rebuild Spark with Hive 0.14 or generate a new table filtering out the values you do not want. On Thu, May 14, 2015 at 3:26 AM luohui20...@sina.com wrote: Hi guys i got to delete some data from a table by delete from table where name = xxx, however delete is not functioning like the DML operation in hive. I got a info like below:Usage: delete [FILE|JAR|ARCHIVE] value [value]* 15/05/14 18:18:24 ERROR processors.DeleteResourceProcessor: Usage: delete [FILE|JAR|ARCHIVE] value [value]* I checked the list of Supported Hive Features , but not found if this dml is supported. So any comments will be appreciated. Thanksamp;Best regards! San.Luo
Re: Spark on Mesos vs Yarn
Hi Ankur, This is a great question as I've heard similar concerns about Spark on Mesos. At the time when I started to contribute to Spark on Mesos approx half year ago, the Mesos scheduler and related code hasn't really got much attention from anyone and it was pretty much in maintenance mode. As a Mesos PMC that is really interested in Spark I started to refactor and check out different JIRAs and PRs around the Mesos scheduler, and after that started to fix various bugs in Spark, added documentation and also in fix related Mesos issues as well. Just recently for 1.4 we've merged in Cluster mode and Docker support, and there are also pending PRs around framework authentication, multi-role support, dynamic allocation, more finer tuned coarse grain mode scheduling configurations, etc. And finally just want to mention that Mesosphere and Typesafe is collaborating to bring a certified distribution ( https://databricks.com/spark/certification/certified-spark-distribution) of Spark on Mesos and DCOS, and we will be pouring resources into not just maintain Spark on Mesos but drive more features into the Mesos scheduler and also in Mesos so stateful services can leverage new APIs and features to make better scheduling decisions and optimizations. I don't have a solidified roadmap to share yet, but we will be discussing this and hopefully can share with the community soon. In summary Spark on Mesos is not dead or in maintenance mode, and look forward to see a lot more changes from us and the community. Tim On Thu, May 14, 2015 at 11:30 PM, Ankur Chauhan an...@malloc64.com wrote: -BEGIN PGP SIGNED MESSAGE- Hash: SHA1 Hi, This is both a survey type as well as a roadmap query question. It seems like of the cluster options to run spark (i.e. via YARN and Mesos), YARN seems to be getting a lot more attention and patches when compared to Mesos. Would it be correct to assume that spark on mesos is more or less a dead or something like a maintenance-only feature and YARN is the recommended way to go? What is the roadmap for spark on mesos? and what is the roadmap for spark on yarn. I like mesos so as much as I would like to see it thrive I don't think spark community is active (or maybe it just appears that way). Another more community oriented question: what do most people use to run spark in production or more-than-POC products? Why did you make that decision? There was a similar post form early 2014 where Metei answered that mesos and yarn were equally important, but has this changed as spark has now reached almost 1.4.0 stage? - -- Ankur Chauhan -BEGIN PGP SIGNATURE- iQEcBAEBAgAGBQJVVZKGAAoJEOSJAMhvLp3L0vEIAI4edLB2rMGk+OTI4WujxX6k Ud5NyFUpaQ8WDjOhwcWB9RK5EoM7X3wGzRcGza1HLVnvdSUBG8Ltabt47GsP2lo0 7H9y2GluUZg/RJXbN0Ehp6moWjAU1W/55POD3t87qeUdydUJVbgDYA/KovNa6i8s Z/e8mfvOrFSJyuJi8KW2KcfOmB1i8VZH7b/zZqtfJKNGo/0dac/gez19vVPaXPa4 WNUN8dHcp0yiZnZ0PUTYNLhI58BXBCSmkEl2Ex7X3NBUGUgJ5HGHn6dpqqNhGvf3 yPw0B0q93NcExK/E4/I75nn4vh5wKLPLWT8U5btphmc7S6h8gWFMEJRHQCdtaUk= =uYXZ -END PGP SIGNATURE- - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: What's the advantage features of Spark SQL(JDBC)
OK. Thanks. On Friday, May 15, 2015 3:35 PM, Cheng, Hao hao.ch...@intel.com wrote: #yiv2190097982 #yiv2190097982 -- _filtered #yiv2190097982 {font-family:Helvetica;panose-1:2 11 6 4 2 2 2 2 2 4;} _filtered #yiv2190097982 {font-family:宋体;panose-1:2 1 6 0 3 1 1 1 1 1;} _filtered #yiv2190097982 {panose-1:2 4 5 3 5 4 6 3 2 4;} _filtered #yiv2190097982 {font-family:Calibri;panose-1:2 15 5 2 2 2 4 3 2 4;} _filtered #yiv2190097982 {panose-1:2 1 6 0 3 1 1 1 1 1;}#yiv2190097982 #yiv2190097982 p.yiv2190097982MsoNormal, #yiv2190097982 li.yiv2190097982MsoNormal, #yiv2190097982 div.yiv2190097982MsoNormal {margin:0cm;margin-bottom:.0001pt;font-size:12.0pt;}#yiv2190097982 a:link, #yiv2190097982 span.yiv2190097982MsoHyperlink {color:blue;text-decoration:underline;}#yiv2190097982 a:visited, #yiv2190097982 span.yiv2190097982MsoHyperlinkFollowed {color:purple;text-decoration:underline;}#yiv2190097982 p.yiv2190097982msonormal, #yiv2190097982 li.yiv2190097982msonormal, #yiv2190097982 div.yiv2190097982msonormal {margin-right:0cm;margin-left:0cm;font-size:12.0pt;}#yiv2190097982 p.yiv2190097982msochpdefault, #yiv2190097982 li.yiv2190097982msochpdefault, #yiv2190097982 div.yiv2190097982msochpdefault {margin-right:0cm;margin-left:0cm;font-size:12.0pt;}#yiv2190097982 span.yiv2190097982msohyperlink {}#yiv2190097982 span.yiv2190097982msohyperlinkfollowed {}#yiv2190097982 span.yiv2190097982emailstyle17 {}#yiv2190097982 p.yiv2190097982msonormal1, #yiv2190097982 li.yiv2190097982msonormal1, #yiv2190097982 div.yiv2190097982msonormal1 {margin:0cm;margin-bottom:.0001pt;font-size:12.0pt;}#yiv2190097982 span.yiv2190097982msohyperlink1 {color:#0563C1;text-decoration:underline;}#yiv2190097982 span.yiv2190097982msohyperlinkfollowed1 {color:#954F72;text-decoration:underline;}#yiv2190097982 span.yiv2190097982emailstyle171 {color:#1F497D;}#yiv2190097982 p.yiv2190097982msochpdefault1, #yiv2190097982 li.yiv2190097982msochpdefault1, #yiv2190097982 div.yiv2190097982msochpdefault1 {margin-right:0cm;margin-left:0cm;font-size:10.0pt;}#yiv2190097982 span.yiv2190097982EmailStyle27 {color:#1F497D;}#yiv2190097982 .yiv2190097982MsoChpDefault {font-size:10.0pt;} _filtered #yiv2190097982 {margin:72.0pt 90.0pt 72.0pt 90.0pt;}#yiv2190097982 div.yiv2190097982WordSection1 {}#yiv2190097982 Yes. From: Yi Zhang [mailto:zhangy...@yahoo.com] Sent: Friday, May 15, 2015 2:51 PM To: Cheng, Hao; User Subject: Re: What's the advantage features of Spark SQL(JDBC) @Hao, As you said, there is no advantage feature for JDBC, it just provides unified api to support different data sources. Is it right? On Friday, May 15, 2015 2:46 PM, Cheng, Hao hao.ch...@intel.com wrote: Spark SQL just take the JDBC as a new data source, the same as we need to support loading data from a .csv or .json. From: Yi Zhang [mailto:zhangy...@yahoo.com.INVALID] Sent: Friday, May 15, 2015 2:30 PM To: User Subject: What's the advantage features of Spark SQL(JDBC) Hi All, Comparing direct access via JDBC, what's the advantage features of Spark SQL(JDBC) to access external data source? Any tips are welcome! Thanks. Regards, Yi
Why association with remote system has failed when set master in Spark programmatically
Hi all, I run start-master.sh to start standalone Spark with spark://192.168.1.164:7077. Then, I use this command as below, and it's OK:./bin/spark-shell --master spark://192.168.1.164:7077 The console print correct message, and Spark context had been initialised correctly. However, when I run app in IntelliJ Idea using spark conf like this:val sparkConf = new SparkConf().setAppName(FromMySql) .setMaster(spark://192.168.1.164:7077) .set(spark.akka.heartbeat.interval, 100) val sc = new SparkContext(sparkConf) val sqlContext = new SQLContext(sc) It can't talk to spark and print these error messages:ReliableDeliverySupervisor: Association with remote system [akka.tcp://sparkMaster@192.168.1.164:7077] has failed, address is now gated for [5000] ms. Reason is: [Disassociated]. If I changed the conf to local[*], it's ok. After I packaged my app then use spark-submit command, the communication between local and remote actor is OK. It's very strange! What happen? Regards,Yi
RE: question about sparksql caching
You probably can try something like: val df = sqlContext.sql(select c1, sum(c2) from T1, T2 where T1.key=T2.key group by c1) df.cache() // Cache the result, but it's a lazy execution. df.registerAsTempTable(my_result) sqlContext.sql(select * from my_result where c1=1).collect // the cache execution will be triggered here when first query on it sqlContext.sql(select * from my_result where c1=1).collect // the cache already there, will be very fast And you can also cache the raw tables like: sqlContext.cacheTable(T1) sqlContext.cacheTable(T2) They also will be cached when first query comes, and we will benefit from it as it's in-memory columnar storages. One thing you should know is the cache here cannot cross processes shared (more precisely, cannot cross the SparkContext instance) -Original Message- From: sequoiadb [mailto:mailing-list-r...@sequoiadb.com] Sent: Friday, May 15, 2015 11:02 AM To: user Subject: question about sparksql caching Hi all, We are planing to use SparkSQL in a DW system. There's a question about the caching mechanism of SparkSQL. For example, if I have a SQL like sqlContext.sql(select c1, sum(c2) from T1, T2 where T1.key=T2.key group by c1).cache() Is it going to cache the final result or the raw data of each table that used in the SQL? Since the user may have various of SQLs that use those tables, if the caching is for the final result only, it may still take very long time to scan the entire table if it's a brand new SQL. If this is the case, is there any other better way to cache the base tables instead of final result? Thanks - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: What's the advantage features of Spark SQL(JDBC)
Yes. From: Yi Zhang [mailto:zhangy...@yahoo.com] Sent: Friday, May 15, 2015 2:51 PM To: Cheng, Hao; User Subject: Re: What's the advantage features of Spark SQL(JDBC) @Hao, As you said, there is no advantage feature for JDBC, it just provides unified api to support different data sources. Is it right? On Friday, May 15, 2015 2:46 PM, Cheng, Hao hao.ch...@intel.commailto:hao.ch...@intel.com wrote: Spark SQL just take the JDBC as a new data source, the same as we need to support loading data from a .csv or .json. From: Yi Zhang [mailto:zhangy...@yahoo.com.INVALID] Sent: Friday, May 15, 2015 2:30 PM To: User Subject: What's the advantage features of Spark SQL(JDBC) Hi All, Comparing direct access via JDBC, what's the advantage features of Spark SQL(JDBC) to access external data source? Any tips are welcome! Thanks. Regards, Yi
What's the advantage features of Spark SQL(JDBC)
Hi All, Comparing direct access via JDBC, what's the advantage features of Spark SQL(JDBC) to access external data source? Any tips are welcome! Thanks. Regards,Yi
Spark on Mesos vs Yarn
-BEGIN PGP SIGNED MESSAGE- Hash: SHA1 Hi, This is both a survey type as well as a roadmap query question. It seems like of the cluster options to run spark (i.e. via YARN and Mesos), YARN seems to be getting a lot more attention and patches when compared to Mesos. Would it be correct to assume that spark on mesos is more or less a dead or something like a maintenance-only feature and YARN is the recommended way to go? What is the roadmap for spark on mesos? and what is the roadmap for spark on yarn. I like mesos so as much as I would like to see it thrive I don't think spark community is active (or maybe it just appears that way). Another more community oriented question: what do most people use to run spark in production or more-than-POC products? Why did you make that decision? There was a similar post form early 2014 where Metei answered that mesos and yarn were equally important, but has this changed as spark has now reached almost 1.4.0 stage? - -- Ankur Chauhan -BEGIN PGP SIGNATURE- iQEcBAEBAgAGBQJVVZKGAAoJEOSJAMhvLp3L0vEIAI4edLB2rMGk+OTI4WujxX6k Ud5NyFUpaQ8WDjOhwcWB9RK5EoM7X3wGzRcGza1HLVnvdSUBG8Ltabt47GsP2lo0 7H9y2GluUZg/RJXbN0Ehp6moWjAU1W/55POD3t87qeUdydUJVbgDYA/KovNa6i8s Z/e8mfvOrFSJyuJi8KW2KcfOmB1i8VZH7b/zZqtfJKNGo/0dac/gez19vVPaXPa4 WNUN8dHcp0yiZnZ0PUTYNLhI58BXBCSmkEl2Ex7X3NBUGUgJ5HGHn6dpqqNhGvf3 yPw0B0q93NcExK/E4/I75nn4vh5wKLPLWT8U5btphmc7S6h8gWFMEJRHQCdtaUk= =uYXZ -END PGP SIGNATURE- - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark on Mesos vs Yarn
Hi Ankur, Just to add a thought to Tim's excellent answer, Spark on Mesos is very important to us and is the recommended deployment for our customers as Typesafe. Thanks for pointing to your PR, I see Tim already went through a round of reviews. It seems very useful, I'll give it a try as well. thanks, iulian On Fri, May 15, 2015 at 9:53 AM, Ankur Chauhan an...@malloc64.com wrote: -BEGIN PGP SIGNED MESSAGE- Hash: SHA1 Hi Tim, Thanks for such a detailed email. I am excited to hear about the new features, I had a pull request going for adding attribute based filtering in the mesos scheduler but it hasn't received much love - https://github.com/apache/spark/pull/5563 . I am a fan of mesos/marathon/mesosphere and spark ecosystems and trying to push adoption at my workplace. It would love to see documentation, tutorials (anything actually) that would make mesos + spark a better and more fleshed out solution. Would it be possible for you to share some links to the JIRA and pull requests so that I can keep track on the progress/features. Again, thanks for replying. - -- Ankur Chauhan On 15/05/2015 00:39, Tim Chen wrote: Hi Ankur, This is a great question as I've heard similar concerns about Spark on Mesos. At the time when I started to contribute to Spark on Mesos approx half year ago, the Mesos scheduler and related code hasn't really got much attention from anyone and it was pretty much in maintenance mode. As a Mesos PMC that is really interested in Spark I started to refactor and check out different JIRAs and PRs around the Mesos scheduler, and after that started to fix various bugs in Spark, added documentation and also in fix related Mesos issues as well. Just recently for 1.4 we've merged in Cluster mode and Docker support, and there are also pending PRs around framework authentication, multi-role support, dynamic allocation, more finer tuned coarse grain mode scheduling configurations, etc. And finally just want to mention that Mesosphere and Typesafe is collaborating to bring a certified distribution (https://databricks.com/spark/certification/certified-spark-distributi on) of Spark on Mesos and DCOS, and we will be pouring resources into not just maintain Spark on Mesos but drive more features into the Mesos scheduler and also in Mesos so stateful services can leverage new APIs and features to make better scheduling decisions and optimizations. I don't have a solidified roadmap to share yet, but we will be discussing this and hopefully can share with the community soon. In summary Spark on Mesos is not dead or in maintenance mode, and look forward to see a lot more changes from us and the community. Tim On Thu, May 14, 2015 at 11:30 PM, Ankur Chauhan an...@malloc64.com mailto:an...@malloc64.com wrote: Hi, This is both a survey type as well as a roadmap query question. It seems like of the cluster options to run spark (i.e. via YARN and Mesos), YARN seems to be getting a lot more attention and patches when compared to Mesos. Would it be correct to assume that spark on mesos is more or less a dead or something like a maintenance-only feature and YARN is the recommended way to go? What is the roadmap for spark on mesos? and what is the roadmap for spark on yarn. I like mesos so as much as I would like to see it thrive I don't think spark community is active (or maybe it just appears that way). Another more community oriented question: what do most people use to run spark in production or more-than-POC products? Why did you make that decision? There was a similar post form early 2014 where Metei answered that mesos and yarn were equally important, but has this changed as spark has now reached almost 1.4.0 stage? -- Ankur Chauhan - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org mailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org mailto:user-h...@spark.apache.org -BEGIN PGP SIGNATURE- iQEcBAEBAgAGBQJVVaXvAAoJEOSJAMhvLp3LzkIH/RLPVUoOcFo0Gij1NpZDszN4 xWvesbOZszuqD8H1Dhyndz4RQKnrodyCE+NycFB+utd9epmuyGemmHpTnq18Gek6 PR5jqmgza94dOy0rfuIVvba14ALZb4tO9SgkjyGujrpMlFYvxTjBYdYCAjfEOTx7 A/vqaCzPSBRBmO8gWx07GWa4zI70qBSZ9KnV7dgtqfUUgPKdF4NnMZWRJjTO9Bp8 tTmWMldqYPqI95wdeeqTGMH0XT6JAKAiCskf62DGadRBsOshrhmh5mAQzUFwoTpA w4uZ+qMrTsblBvOf9z++v0eY8VBiQpOyXfOBiYiCNRtSsGa0KvqwgF1S/yLeRs0= =4Aax -END PGP SIGNATURE- - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- -- Iulian Dragos -- Reactive Apps on the JVM www.typesafe.com
Re: Worker Spark Port
So I'm using code like this to use specific ports: val conf = new SparkConf() .setMaster(master) .setAppName(namexxx) .set(spark.driver.port, 51810) .set(spark.fileserver.port, 51811) .set(spark.broadcast.port, 51812) .set(spark.replClassServer.port, 51813) .set(spark.blockManager.port, 51814) .set(spark.executor.port, 51815) My question now is : Will the master forward the spark.executor.port value (to use) to the worker when it hands it a task to do? Also the property spark.executor.port is different from the Worker spark port, how can I make the Worker run on a specific port? Regards jk On Wed, May 13, 2015 at 7:51 PM, James King jakwebin...@gmail.com wrote: Indeed, many thanks. On Wednesday, 13 May 2015, Cody Koeninger c...@koeninger.org wrote: I believe most ports are configurable at this point, look at http://spark.apache.org/docs/latest/configuration.html search for .port On Wed, May 13, 2015 at 9:38 AM, James King jakwebin...@gmail.com wrote: I understated that this port value is randomly selected. Is there a way to enforce which spark port a Worker should use?
Grouping and storing unordered time series data stream to HDFS
Hi all, I have a stream of data from Kafka that I want to process and store in hdfs using Spark Streaming. Each data has a date/time dimension and I want to write data within the same time dimension to the same hdfs directory. The data stream might be unordered (by time dimension). I'm wondering what are the best practices in grouping/storing time series data stream using Spark Streaming? I'm considering grouping each batch of data in Spark Streaming per time dimension and then saving each group to different hdfs directories. However since it is possible for data with the same time dimension to be in different batches, I would need to handle update in case the hdfs directory already exists. Is this a common approach? Are there any other approaches that I can try? Thank you! Nisrina.
Forbidded : Error Code: 403
Hello list, *Scenario : *I am trying to read an Avro file stored in S3 and create a DataFrame out of it using *Spark-Avro* https://github.com/databricks/spark-avro library, but unable to do so. This is the code which I am using : public class S3DataFrame { public static void main(String[] args) { System.out.println(START...); SparkConf conf = new SparkConf().setAppName(DataFrameDemo).setMaster(local); JavaSparkContext sc = new JavaSparkContext(conf); Configuration config = sc.hadoopConfiguration(); config.set(fs.s3a.impl, org.apache.hadoop.fs.s3a.S3AFileSystem); config.set(fs.s3a.access.key,); config.set(fs.s3a.secret.key,*); config.set(fs.s3a.endpoint, s3-us-west-2.amazonaws.com); SQLContext sqlContext = new SQLContext(sc); DataFrame df = sqlContext.load(s3a://bucket-name/file.avro, com.databricks.spark.avro); df.show(); df.printSchema(); df.select(title).show(); System.out.println(DONE); // df.save(/new/dir/, com.databricks.spark.avro); } } *Problem :* *Getting Exception in thread main com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden;* And this is the complete exception trace : Exception in thread main com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: 63A603F1DC6FB900), S3 Extended Request ID: vh5XhXSVO5ZvhX8c4I3tOWQD/T+B0ZW/MCYzUnuNnQ0R2JoBmJ0MPmUePRiQnPVASTbkonoFPIg= at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:1088) at com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:735) at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:461) at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:296) at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3743) at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1027) at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1005) at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:688) at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:71) at org.apache.hadoop.fs.Globber.getFileStatus(Globber.java:57) at org.apache.hadoop.fs.Globber.glob(Globber.java:248) at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1623) at com.databricks.spark.avro.AvroRelation.newReader(AvroRelation.scala:105) at com.databricks.spark.avro.AvroRelation.init(AvroRelation.scala:60) at com.databricks.spark.avro.DefaultSource.createRelation(DefaultSource.scala:41) at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:219) at org.apache.spark.sql.SQLContext.load(SQLContext.scala:697) at org.apache.spark.sql.SQLContext.load(SQLContext.scala:673) at org.myorg.dataframe.S3DataFrame.main(S3DataFrame.java:25) Would really appreciate some help. Thank you so much for your precious time. *Software versions used :* spark-1.3.1-bin-hadoop2.4 hadoop-aws-2.6.0.jar MAC OS X 10.10.3 java version 1.6.0_65 [image: http://] Tariq, Mohammad about.me/mti [image: http://] http://about.me/mti
Re: kafka + Spark Streaming with checkPointing fails to start with
I had same problem. The solution, I've found was to use: JavaStreamingContext streamingContext = JavaStreamingContext.getOrCreate('checkpoint_dir', contextFactory); ALL configuration should be performed inside contextFactory. If you try to configure streamContext after ::getOrCreate, you receive an error has not been initialized. On 13.05.2015 00:51, Ankur Chauhan wrote: Hi, I have a simple application which fails with the following exception only when the application is restarted (i.e. the checkpointDir has entires from a previous execution): Exception in thread main org.apache.spark.SparkException: org.apache.spark.streaming.dstream.ShuffledDStream@2264e43c has not been initialized at org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:266) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:227) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:222) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:222) at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:90) at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:67) at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:512) at com.brightcove.analytics.tacoma.RawLogProcessor$.start(RawLogProcessor.scala:115) at com.brightcove.analytics.tacoma.Main$delayedInit$body.apply(Main.scala:15) at scala.Function0$class.apply$mcV$sp(Function0.scala:40) at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) at scala.App$$anonfun$main$1.apply(App.scala:71) at scala.App$$anonfun$main$1.apply(App.scala:71) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32) at scala.App$class.main(App.scala:71) at com.brightcove.analytics.tacoma.Main$.main(Main.scala:5) at com.brightcove.analytics.tacoma.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) The relavant source is: class RawLogProcessor(ssc: StreamingContext, topic: String, kafkaParams: Map[String, String]) { // create kafka stream val rawlogDStream = KafkaUtils.createDirectStream[String, Object, StringDecoder, KafkaAvroDecoder](ssc, kafkaParams, Set(topic)) //KafkaUtils.createStream[String, Object, StringDecoder, KafkaAvroDecoder](ssc, kafkaParams, Map(qa-rawlogs - 10), StorageLevel.MEMORY_AND_DISK_2) val eventStream = rawlogDStream .map({ case (key, rawlogVal) = val record = rawlogVal.asInstanceOf[GenericData.Record] val rlog = RawLog.newBuilder()
Re: store hive metastore on persistent store
This should work. Which version of Spark are you using? Here is what I do -- make sure hive-site.xml is in the conf directory of the machine you're using the driver from. Now let's run spark-shell from that machine: scala val hc= new org.apache.spark.sql.hive.HiveContext(sc) hc: org.apache.spark.sql.hive.HiveContext = org.apache.spark.sql.hive.HiveContext@6e9f8f26 scala hc.sql(show tables).collect 15/05/15 09:34:17 INFO metastore: Trying to connect to metastore with URI thrift://hostname.com:9083 -- here should be a value from your hive-site.xml 15/05/15 09:34:17 INFO metastore: Waiting 1 seconds before next connection attempt. 15/05/15 09:34:18 INFO metastore: Connected to metastore. res0: Array[org.apache.spark.sql.Row] = Array([table1,false], scala hc.getConf(hive.metastore.uris) res13: String = thrift://hostname.com:9083 scala hc.getConf(hive.metastore.warehouse.dir) res14: String = /user/hive/warehouse The first line tells you which metastore it's trying to connect to -- this should be the string specified under hive.metastore.uris property in your hive-site.xml file. I have not mucked with warehouse.dir too much but I know that the value of the metastore URI is in fact picked up from there as I regularly point to different systems... On Thu, May 14, 2015 at 6:26 PM, Tamas Jambor jambo...@gmail.com wrote: I have tried to put the hive-site.xml file in the conf/ directory with, seems it is not picking up from there. On Thu, May 14, 2015 at 6:50 PM, Michael Armbrust mich...@databricks.com wrote: You can configure Spark SQLs hive interaction by placing a hive-site.xml file in the conf/ directory. On Thu, May 14, 2015 at 10:24 AM, jamborta jambo...@gmail.com wrote: Hi all, is it possible to set hive.metastore.warehouse.dir, that is internally create by spark, to be stored externally (e.g. s3 on aws or wasb on azure)? thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/store-hive-metastore-on-persistent-store-tp22891.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Custom Aggregate Function for DataFrame
can you kindly elaborate on this? it should be possible to write udafs in similar lines of sum/min etc. On Fri, May 15, 2015 at 5:49 AM, Justin Yip yipjus...@prediction.io wrote: Hello, May I know if these is way to implement aggregate function for grouped data in DataFrame? I dug into the doc but didn't find any apart from the UDF functions which applies on a Row. Maybe I have missed something. Thanks. Justin -- View this message in context: Custom Aggregate Function for DataFrame http://apache-spark-user-list.1001560.n3.nabble.com/Custom-Aggregate-Function-for-DataFrame-tp22893.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com. -- Best Regards, Ayan Guha
Re: Worker Spark Port
Hi I think you are mixing things a bit. Worker is part of the cluster. So it is governed by cluster manager. If you are running standalone cluster, then you can modify spark-env and configure SPARK_WORKER_PORT. executors, on the other hand, are bound with an application, ie spark context. Thus you modify executor properties through a context. So, master != driver and executor != worker. Best Ayan On Fri, May 15, 2015 at 7:52 PM, James King jakwebin...@gmail.com wrote: So I'm using code like this to use specific ports: val conf = new SparkConf() .setMaster(master) .setAppName(namexxx) .set(spark.driver.port, 51810) .set(spark.fileserver.port, 51811) .set(spark.broadcast.port, 51812) .set(spark.replClassServer.port, 51813) .set(spark.blockManager.port, 51814) .set(spark.executor.port, 51815) My question now is : Will the master forward the spark.executor.port value (to use) to the worker when it hands it a task to do? Also the property spark.executor.port is different from the Worker spark port, how can I make the Worker run on a specific port? Regards jk On Wed, May 13, 2015 at 7:51 PM, James King jakwebin...@gmail.com wrote: Indeed, many thanks. On Wednesday, 13 May 2015, Cody Koeninger c...@koeninger.org wrote: I believe most ports are configurable at this point, look at http://spark.apache.org/docs/latest/configuration.html search for .port On Wed, May 13, 2015 at 9:38 AM, James King jakwebin...@gmail.com wrote: I understated that this port value is randomly selected. Is there a way to enforce which spark port a Worker should use? -- Best Regards, Ayan Guha
Re: Grouping and storing unordered time series data stream to HDFS
Hi Do you have a cut off time, like how late an event can be? Else, you may consider a different persistent storage like Cassandra/Hbase and delegate update: part to them. On Fri, May 15, 2015 at 8:10 PM, Nisrina Luthfiyati nisrina.luthfiy...@gmail.com wrote: Hi all, I have a stream of data from Kafka that I want to process and store in hdfs using Spark Streaming. Each data has a date/time dimension and I want to write data within the same time dimension to the same hdfs directory. The data stream might be unordered (by time dimension). I'm wondering what are the best practices in grouping/storing time series data stream using Spark Streaming? I'm considering grouping each batch of data in Spark Streaming per time dimension and then saving each group to different hdfs directories. However since it is possible for data with the same time dimension to be in different batches, I would need to handle update in case the hdfs directory already exists. Is this a common approach? Are there any other approaches that I can try? Thank you! Nisrina. -- Best Regards, Ayan Guha
Re: Why association with remote system has failed when set master in Spark programmatically
I debugged it, and the remote actor can be fetched in the tryRegisterAllMasters() method in AppClient: def tryRegisterAllMasters() { for (masterAkkaUrl - masterAkkaUrls) { logInfo(Connecting to master + masterAkkaUrl + ...) val actor = context.actorSelection(masterAkkaUrl) actor ! RegisterApplication(appDescription) } } After actor send the RegisterApplication message, it seems like the message is not routed to the remote actor, so registering operation is not finished, then failed. But I don't know what is the reason. Who can help me? On Friday, May 15, 2015 4:06 PM, Yi Zhang zhangy...@yahoo.com.INVALID wrote: Hi all, I run start-master.sh to start standalone Spark with spark://192.168.1.164:7077. Then, I use this command as below, and it's OK:./bin/spark-shell --master spark://192.168.1.164:7077 The console print correct message, and Spark context had been initialised correctly. However, when I run app in IntelliJ Idea using spark conf like this:val sparkConf = new SparkConf().setAppName(FromMySql) .setMaster(spark://192.168.1.164:7077) .set(spark.akka.heartbeat.interval, 100) val sc = new SparkContext(sparkConf) val sqlContext = new SQLContext(sc) It can't talk to spark and print these error messages:ReliableDeliverySupervisor: Association with remote system [akka.tcp://sparkMaster@192.168.1.164:7077] has failed, address is now gated for [5000] ms. Reason is: [Disassociated]. If I changed the conf to local[*], it's ok. After I packaged my app then use spark-submit command, the communication between local and remote actor is OK. It's very strange! What happen? Regards,Yi
Re: Forbidded : Error Code: 403
Have you verified that you can download the file from bucket-name without using Spark ? Seems like permission issue. Cheers On May 15, 2015, at 5:09 AM, Mohammad Tariq donta...@gmail.com wrote: Hello list, Scenario : I am trying to read an Avro file stored in S3 and create a DataFrame out of it using Spark-Avro library, but unable to do so. This is the code which I am using : public class S3DataFrame { public static void main(String[] args) { System.out.println(START...); SparkConf conf = new SparkConf().setAppName(DataFrameDemo).setMaster(local); JavaSparkContext sc = new JavaSparkContext(conf); Configuration config = sc.hadoopConfiguration(); config.set(fs.s3a.impl, org.apache.hadoop.fs.s3a.S3AFileSystem); config.set(fs.s3a.access.key,); config.set(fs.s3a.secret.key,*); config.set(fs.s3a.endpoint, s3-us-west-2.amazonaws.com); SQLContext sqlContext = new SQLContext(sc); DataFrame df = sqlContext.load(s3a://bucket-name/file.avro, com.databricks.spark.avro); df.show(); df.printSchema(); df.select(title).show(); System.out.println(DONE); //df.save(/new/dir/, com.databricks.spark.avro); } } Problem : Getting Exception in thread main com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; And this is the complete exception trace : Exception in thread main com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: 63A603F1DC6FB900), S3 Extended Request ID: vh5XhXSVO5ZvhX8c4I3tOWQD/T+B0ZW/MCYzUnuNnQ0R2JoBmJ0MPmUePRiQnPVASTbkonoFPIg= at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:1088) at com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:735) at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:461) at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:296) at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3743) at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1027) at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1005) at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:688) at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:71) at org.apache.hadoop.fs.Globber.getFileStatus(Globber.java:57) at org.apache.hadoop.fs.Globber.glob(Globber.java:248) at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1623) at com.databricks.spark.avro.AvroRelation.newReader(AvroRelation.scala:105) at com.databricks.spark.avro.AvroRelation.init(AvroRelation.scala:60) at com.databricks.spark.avro.DefaultSource.createRelation(DefaultSource.scala:41) at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:219) at org.apache.spark.sql.SQLContext.load(SQLContext.scala:697) at org.apache.spark.sql.SQLContext.load(SQLContext.scala:673) at org.myorg.dataframe.S3DataFrame.main(S3DataFrame.java:25) Would really appreciate some help. Thank you so much for your precious time. Software versions used : spark-1.3.1-bin-hadoop2.4 hadoop-aws-2.6.0.jar MAC OS X 10.10.3 java version 1.6.0_65 Tariq, Mohammad about.me/mti
Re: Worker Spark Port
I think this answers my question executors, on the other hand, are bound with an application, ie spark context. Thus you modify executor properties through a context. Many Thanks. jk On Fri, May 15, 2015 at 3:23 PM, ayan guha guha.a...@gmail.com wrote: Hi I think you are mixing things a bit. Worker is part of the cluster. So it is governed by cluster manager. If you are running standalone cluster, then you can modify spark-env and configure SPARK_WORKER_PORT. executors, on the other hand, are bound with an application, ie spark context. Thus you modify executor properties through a context. So, master != driver and executor != worker. Best Ayan On Fri, May 15, 2015 at 7:52 PM, James King jakwebin...@gmail.com wrote: So I'm using code like this to use specific ports: val conf = new SparkConf() .setMaster(master) .setAppName(namexxx) .set(spark.driver.port, 51810) .set(spark.fileserver.port, 51811) .set(spark.broadcast.port, 51812) .set(spark.replClassServer.port, 51813) .set(spark.blockManager.port, 51814) .set(spark.executor.port, 51815) My question now is : Will the master forward the spark.executor.port value (to use) to the worker when it hands it a task to do? Also the property spark.executor.port is different from the Worker spark port, how can I make the Worker run on a specific port? Regards jk On Wed, May 13, 2015 at 7:51 PM, James King jakwebin...@gmail.com wrote: Indeed, many thanks. On Wednesday, 13 May 2015, Cody Koeninger c...@koeninger.org wrote: I believe most ports are configurable at this point, look at http://spark.apache.org/docs/latest/configuration.html search for .port On Wed, May 13, 2015 at 9:38 AM, James King jakwebin...@gmail.com wrote: I understated that this port value is randomly selected. Is there a way to enforce which spark port a Worker should use? -- Best Regards, Ayan Guha
Re: SPARK-4412 regressed?
(I made you a Contributor in JIRA -- your yahoo-related account of the two -- so maybe that will let you do so.) On Fri, May 15, 2015 at 4:19 PM, Yana Kadiyska yana.kadiy...@gmail.com wrote: Hi, two questions 1. Can regular JIRA users reopen bugs -- I can open a new issue but it does not appear that I can reopen issues. What is the proper protocol to follow if we discover regressions? 2. I believe SPARK-4412 regressed in Spark 1.3.1, according to this SO thread possibly even in 1.3.0 http://stackoverflow.com/questions/30052889/how-to-suppress-parquet-log-messages-in-spark - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
SPARK-4412 regressed?
Hi, two questions 1. Can regular JIRA users reopen bugs -- I can open a new issue but it does not appear that I can reopen issues. What is the proper protocol to follow if we discover regressions? 2. I believe SPARK-4412 regressed in Spark 1.3.1, according to this SO thread possibly even in 1.3.0 http://stackoverflow.com/questions/30052889/how-to-suppress-parquet-log-messages-in-spark
Re: Error communicating with MapOutputTracker
Hi Imran, Thanks for the advice, tweaking with some akka parameters helped. See below. Now, we noticed that we get java heap OOM exceptions on the output tracker when we have too many tasks. I wonder: 1. where does the map output tracker live? The driver? The master (when those are not the same)? 2. how can we increase the heap for it? Especially when using spark-submit? Thanks, Thomas PS: akka parameter that one might want to increase: # akka timeouts/heartbeats settings multiplied by 10 to avoid problems spark.akka.timeout 1000 spark.akka.heartbeat.pauses 6 spark.akka.failure-detector.threshold 3000.0 spark.akka.heartbeat.interval 1 # Hidden akka conf to avoid MapOutputTracker timeouts # See https://github.com/apache/spark/blob/branch-1.3/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala spark.akka.askTimeout 300 spark.akka.lookupTimeout 300 On Fri, Mar 20, 2015 at 9:18 AM, Imran Rashid iras...@cloudera.com wrote: Hi Thomas, sorry for such a late reply. I don't have any super-useful advice, but this seems like something that is important to follow up on. to answer your immediate question, No, there should not be any hard limit to the number of tasks that MapOutputTracker can handle. Though of course as things get bigger, the overheads increase which is why you might hit timeouts. Two other minor suggestions: (1) increase spark.akka.askTimeout -- thats the timeout you are running into, it defaults to 30 seconds (2) as you've noted, you've needed to play w/ other timeouts b/c of long GC pauses -- its possible some GC tuning might help, though its a bit of a black art so its hard to say what you can try. You cold always try Concurrent Mark Swee to avoid the long pauses, but of course that will probably hurt overall performance. can you share any more details of what you are trying to do? Since you're fetching shuffle blocks in a shuffle map task, I guess you've got two shuffles back-to-back, eg. someRDD.reduceByKey{...}.map{...}.filter{...}.combineByKey{...}. Do you expect to be doing a lot of GC in between the two shuffles?? -eg., in the little example I have, if there were lots of objects being created in the map filter steps that will make it out of the eden space. One possible solution to this would be to force the first shuffle to complete, before running any of the subsequent transformations, eg. by forcing materialization to the cache first val intermediateRDD = someRDD.reduceByKey{...}.persist(DISK) intermediateRDD.count() // force the shuffle to complete, without trying to do our complicated downstream logic at the same time val finalResult = intermediateRDD.map{...}.filter{...}.combineByKey{...} Also, can you share your data size? Do you expect the shuffle to be skewed, or do you think it will be well-balanced? Not that I'll have any suggestions for you based on the answer, but it may help us reproduce it and try to fix whatever the root cause is. thanks, Imran On Wed, Mar 4, 2015 at 12:30 PM, Thomas Gerber thomas.ger...@radius.com wrote: I meant spark.default.parallelism of course. On Wed, Mar 4, 2015 at 10:24 AM, Thomas Gerber thomas.ger...@radius.com wrote: Follow up: We re-retried, this time after *decreasing* spark.parallelism. It was set to 16000 before, (5 times the number of cores in our cluster). It is now down to 6400 (2 times the number of cores). And it got past the point where it failed before. Does the MapOutputTracker have a limit on the number of tasks it can track? On Wed, Mar 4, 2015 at 8:15 AM, Thomas Gerber thomas.ger...@radius.com wrote: Hello, We are using spark 1.2.1 on a very large cluster (100 c3.8xlarge workers). We use spark-submit to start an application. We got the following error which leads to a failed stage: Job aborted due to stage failure: Task 3095 in stage 140.0 failed 4 times, most recent failure: Lost task 3095.3 in stage 140.0 (TID 308697, ip-10-0-12-88.ec2.internal): org.apache.spark.SparkException: Error communicating with MapOutputTracker We tried the whole application again, and it failed on the same stage (but it got more tasks completed on that stage) with the same error. We then looked at executors stderr, and all show similar logs, on both runs (see below). As far as we can tell, executors and master have disk space left. *Any suggestion on where to look to understand why the communication with the MapOutputTracker fails?* Thanks Thomas In case it matters, our akka settings: spark.akka.frameSize 50 spark.akka.threads 8 // those below are 10* the default, to cope with large GCs spark.akka.timeout 1000 spark.akka.heartbeat.pauses 6 spark.akka.failure-detector.threshold 3000.0 spark.akka.heartbeat.interval 1 Appendix: executor logs, where it starts going awry 15/03/04 11:45:00 INFO CoarseGrainedExecutorBackend: Got assigned task 298525 15/03/04 11:45:00 INFO Executor: Running task 3083.0 in stage
Best practice to avoid ambiguous columns in DataFrame.join
Hello, I would like ask know if there are recommended ways of preventing ambiguous columns when joining dataframes. When we join dataframes, it usually happen we join the column with identical name. I could have rename the columns on the right data frame, as described in the following code. Is there a better way to achieve this? scala val df = sqlContext.createDataFrame(Seq((1, a), (2, b), (3, b), (4, b))) df: org.apache.spark.sql.DataFrame = [_1: int, _2: string] scala val df2 = sqlContext.createDataFrame(Seq((1, 10), (2, 20), (3, 30), (4, 40))) df2: org.apache.spark.sql.DataFrame = [_1: int, _2: int] scala df.join(df2.withColumnRenamed(_1, right_key), $_1 === $right_key).printSchema Thanks. Justin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Best-practice-to-avoid-ambiguous-columns-in-DataFrame-join-tp22907.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Broadcast variables can be rebroadcast?
Thanks Ilya. Does one have to call broadcast again once the underlying data is updated in order to get the changes visible on all nodes? Thanks NB On Fri, May 15, 2015 at 5:29 PM, Ilya Ganelin ilgan...@gmail.com wrote: The broadcast variable is like a pointer. If the underlying data changes then the changes will be visible throughout the cluster. On Fri, May 15, 2015 at 5:18 PM NB nb.nos...@gmail.com wrote: Hello, Once a broadcast variable is created using sparkContext.broadcast(), can it ever be updated again? The use case is for something like the underlying lookup data changing over time. Thanks NB -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Broadcast-variables-can-be-rebroadcast-tp22908.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark's Guava pieces cause exceptions in non-trivial deployments
On Fri, May 15, 2015 at 2:35 PM, Thomas Dudziak tom...@gmail.com wrote: I've just been through this exact case with shaded guava in our Mesos setup and that is how it behaves there (with Spark 1.3.1). If that's the case, it's a bug in the Mesos backend, since the spark.* options should behave exactly the same as SPARK_CLASSPATH. It would be nice to know whether that is also the case in 1.4 (I took a quick look at the related code and it seems correct), but I don't have Mesos around to test. On Fri, May 15, 2015 at 12:04 PM, Marcelo Vanzin van...@cloudera.com wrote: On Fri, May 15, 2015 at 11:56 AM, Thomas Dudziak tom...@gmail.com wrote: Actually the extraClassPath settings put the extra jars at the end of the classpath so they won't help. Only the deprecated SPARK_CLASSPATH puts them at the front. That's definitely not the case for YARN: https://github.com/apache/spark/blob/master/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala#L1013 And it's been like that for as far as I remember. I'm almost sure that's also the case for standalone, at least in master / 1.4, since I touched a lot of that code recently. It would be really weird if those options worked differently from SPARK_CLASSPATH, since they were meant to replace it. On Fri, May 15, 2015 at 11:54 AM, Marcelo Vanzin van...@cloudera.com wrote: Ah, I see. yeah, it sucks that Spark has to expose Optional (and things it depends on), but removing that would break the public API, so... One last thing you could try is do add your newer Guava jar to spark.driver.extraClassPath and spark.executor.extraClassPath. Those settings will place your jars before Spark's in the classpath, so you'd actually be using the newer versions of the conflicting classes everywhere. It does require manually distributing the Guava jar to the same location on all nodes in the cluster, though. If that doesn't work. Thomas's suggestion of shading Guava in your app can be used as a last resort. On Thu, May 14, 2015 at 7:38 PM, Anton Brazhnyk anton.brazh...@genesys.com wrote: The problem is with 1.3.1 It has Function class (mentioned in exception) in spark-network-common_2.10-1.3.1.jar. Our current resolution is actually backport to 1.2.2, which is working fine. *From:* Marcelo Vanzin [mailto:van...@cloudera.com] *Sent:* Thursday, May 14, 2015 6:27 PM *To:* Anton Brazhnyk *Cc:* user@spark.apache.org *Subject:* Re: Spark's Guava pieces cause exceptions in non-trivial deployments What version of Spark are you using? The bug you mention is only about the Optional class (and a handful of others, but none of the classes you're having problems with). All other Guava classes should be shaded since Spark 1.2, so you should be able to use your own version of Guava with no problems (aside from the Optional classes). Also, Spark 1.3 added some improvements to how shading is done, so if you're using 1.2 I'd recommend trying 1.3 before declaring defeat. On Thu, May 14, 2015 at 4:52 PM, Anton Brazhnyk anton.brazh...@genesys.com wrote: Greetings, I have a relatively complex application with Spark, Jetty and Guava (16) not fitting together. Exception happens when some components try to use “mix” of Guava classes (including Spark’s pieces) that are loaded by different classloaders: java.lang.LinkageError: loader constraint violation: when resolving method com.google.common.collect.Iterables.transform(Ljava/lang/Iterable;Lcom/google/common/base/Function;)Ljava/lang/Iterable; the class loader (instance of org/eclipse/jetty/webapp/WebAppClassLoader) of the current class, org/apache/cassandra/db/ColumnFamilyStore, and the class loader (instance of java/net/URLClassLoader) for resolved class, com/google/common/collect/Iterables, have different Class objects for the type e;Lcom/google/common/base/Function;)Ljava/lang/Iterable; used in the signature According to https://issues.apache.org/jira/browse/SPARK-4819 it’s not going to be fixed at least until Spark 2.0, but maybe some workaround is possible? Those classes are pretty simple and have low chances to be changed in Guava significantly, so any “external” Guava can provide them. So, could such problems be fixed if those Spark’s pieces of Guava would be in separate jar and could be excluded from the mix (substituted by “external” Guava)? Thanks, Anton -- Marcelo -- Marcelo -- Marcelo -- Marcelo
Re: Using groupByKey with Spark SQL
Perhaps you are looking for GROUP BY and collect_set, which would allow you to stay in SQL. I'll add that in Spark 1.4 you can get access to items of a row by name. On Fri, May 15, 2015 at 10:48 AM, Edward Sargisson ejsa...@gmail.com wrote: Hi all, This might be a question to be answered or feedback for a possibly new feature depending: We have source data which is events about the state changes of an entity (identified by an ID) represented as nested JSON. We wanted to sessionize this data so that we had a collection of all the events for a given ID as we have to do more processing based on what we find. We tried doing this using Spark SQL and then converting to a JavaPairRDD using DataFrame.javaRdd.groupByKey. The schema inference worked great but what was frustrating was that the result of groupByKey is String, IterableRow. Rows only have get(int) methods and don't take notice of the schema stuff so they ended up being something we didn't want to work with. We are currently solving this problem by ignoring Spark SQL and deserializing the event JSON into a POJO for further processing. Are there better approaches to this? Perhaps Spark should have a DataFrame.groupByKey that returns Rows that can be used with the schema stuff? Thanks! Edward
Re: Best practice to avoid ambiguous columns in DataFrame.join
There are several ways to solve this ambiguity: *1. use the DataFrames to get the attribute so its already resolved and not just a string we need to map to a DataFrame.* df.join(df2, df(_1) === df2(_1)) *2. Use aliases* df.as('a).join(df2.as('b), $a._1 === $b._1) *3. rename the columns as you suggested.* df.join(df2.withColumnRenamed(_1, right_key), $_1 === $right_key).printSchema *4. (Spark 1.4 only) use def join(right: DataFrame, usingColumn: String): DataFrame* df.join(df1, _1) This has the added benefit of only outputting a single _1 column. On Fri, May 15, 2015 at 3:44 PM, Justin Yip yipjus...@prediction.io wrote: Hello, I would like ask know if there are recommended ways of preventing ambiguous columns when joining dataframes. When we join dataframes, it usually happen we join the column with identical name. I could have rename the columns on the right data frame, as described in the following code. Is there a better way to achieve this? scala val df = sqlContext.createDataFrame(Seq((1, a), (2, b), (3, b), (4, b))) df: org.apache.spark.sql.DataFrame = [_1: int, _2: string] scala val df2 = sqlContext.createDataFrame(Seq((1, 10), (2, 20), (3, 30), (4, 40))) df2: org.apache.spark.sql.DataFrame = [_1: int, _2: int] scala df.join(df2.withColumnRenamed(_1, right_key), $_1 === $right_key).printSchema Thanks. Justin -- View this message in context: Best practice to avoid ambiguous columns in DataFrame.join http://apache-spark-user-list.1001560.n3.nabble.com/Best-practice-to-avoid-ambiguous-columns-in-DataFrame-join-tp22907.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
Broadcast variables can be rebroadcast?
Hello, Once a broadcast variable is created using sparkContext.broadcast(), can it ever be updated again? The use case is for something like the underlying lookup data changing over time. Thanks NB -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Broadcast-variables-can-be-rebroadcast-tp22908.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Broadcast variables can be rebroadcast?
The broadcast variable is like a pointer. If the underlying data changes then the changes will be visible throughout the cluster. On Fri, May 15, 2015 at 5:18 PM NB nb.nos...@gmail.com wrote: Hello, Once a broadcast variable is created using sparkContext.broadcast(), can it ever be updated again? The use case is for something like the underlying lookup data changing over time. Thanks NB -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Broadcast-variables-can-be-rebroadcast-tp22908.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Broadcast variables can be rebroadcast?
Nope. It will just work when you all x.value. On Fri, May 15, 2015 at 5:39 PM N B nb.nos...@gmail.com wrote: Thanks Ilya. Does one have to call broadcast again once the underlying data is updated in order to get the changes visible on all nodes? Thanks NB On Fri, May 15, 2015 at 5:29 PM, Ilya Ganelin ilgan...@gmail.com wrote: The broadcast variable is like a pointer. If the underlying data changes then the changes will be visible throughout the cluster. On Fri, May 15, 2015 at 5:18 PM NB nb.nos...@gmail.com wrote: Hello, Once a broadcast variable is created using sparkContext.broadcast(), can it ever be updated again? The use case is for something like the underlying lookup data changing over time. Thanks NB -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Broadcast-variables-can-be-rebroadcast-tp22908.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Custom Aggregate Function for DataFrame
Hi Ayan, I have a DF constructed from the following case class Event: case class State { attr1: String, } case class Event { userId: String, time: Long, state: State } I would like to generate a DF which contains the latest state of each userId. I could have first compute the latest time of each user, and join it back to the original data frame. But that involves two shuffles. Hence would like to see if there are ways to improve the performance. Thanks. Justin On Fri, May 15, 2015 at 6:32 AM, ayan guha guha.a...@gmail.com wrote: can you kindly elaborate on this? it should be possible to write udafs in similar lines of sum/min etc. On Fri, May 15, 2015 at 5:49 AM, Justin Yip yipjus...@prediction.io wrote: Hello, May I know if these is way to implement aggregate function for grouped data in DataFrame? I dug into the doc but didn't find any apart from the UDF functions which applies on a Row. Maybe I have missed something. Thanks. Justin -- View this message in context: Custom Aggregate Function for DataFrame http://apache-spark-user-list.1001560.n3.nabble.com/Custom-Aggregate-Function-for-DataFrame-tp22893.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com. -- Best Regards, Ayan Guha
Re: Best practice to avoid ambiguous columns in DataFrame.join
Thanks Michael, This is very helpful. I have a follow up question related to NaFunctions. Usually after a left outer join, we get lots of null value and we need to handle them before further processing. I have the following piece of code, the _1 column is duplicated and crashes the .na.fill functions. From your answer, it appears that Spark 1.4 resolves this issue as only a single _1 column is outputted. You know if there is a good workaround for Spark 1.3? scala df3.show _1 a 1 a 2 b 3 b 4 b scala df4.show _1 b 1 10 2 null 3 3 4 0 scala df3.join(df4, df3(_1) === df4(_1)).na.fill(-999) org.apache.spark.sql.AnalysisException: Reference '_1' is ambiguous, could be: _1#33, _1#31.; at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:229) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:128) at org.apache.spark.sql.DataFrame.resolve(DataFrame.scala:161) ... Thanks! Justin On Fri, May 15, 2015 at 3:55 PM, Michael Armbrust mich...@databricks.com wrote: There are several ways to solve this ambiguity: *1. use the DataFrames to get the attribute so its already resolved and not just a string we need to map to a DataFrame.* df.join(df2, df(_1) === df2(_1)) *2. Use aliases* df.as('a).join(df2.as('b), $a._1 === $b._1) *3. rename the columns as you suggested.* df.join(df2.withColumnRenamed(_1, right_key), $_1 === $right_key).printSchema *4. (Spark 1.4 only) use def join(right: DataFrame, usingColumn: String): DataFrame* df.join(df1, _1) This has the added benefit of only outputting a single _1 column. On Fri, May 15, 2015 at 3:44 PM, Justin Yip yipjus...@prediction.io wrote: Hello, I would like ask know if there are recommended ways of preventing ambiguous columns when joining dataframes. When we join dataframes, it usually happen we join the column with identical name. I could have rename the columns on the right data frame, as described in the following code. Is there a better way to achieve this? scala val df = sqlContext.createDataFrame(Seq((1, a), (2, b), (3, b), (4, b))) df: org.apache.spark.sql.DataFrame = [_1: int, _2: string] scala val df2 = sqlContext.createDataFrame(Seq((1, 10), (2, 20), (3, 30), (4, 40))) df2: org.apache.spark.sql.DataFrame = [_1: int, _2: int] scala df.join(df2.withColumnRenamed(_1, right_key), $_1 === $right_key).printSchema Thanks. Justin -- View this message in context: Best practice to avoid ambiguous columns in DataFrame.join http://apache-spark-user-list.1001560.n3.nabble.com/Best-practice-to-avoid-ambiguous-columns-in-DataFrame-join-tp22907.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
Hive Skew flag?
Just wondering if we have any timeline on when the hive skew flag will be included within SparkSQL? Thanks! Denny
FetchFailedException and MetadataFetchFailedException
I am trying to sort a collection of key,value pairs (between several hundred million to a few billion) and have recently been getting lots of FetchFailedException errors that seem to originate when one of the executors doesn't seem to find a temporary shuffle file on disk. E.g.: org.apache.spark.shuffle.FetchFailedException: /hadoop/tmp/hadoop-hadoop/nm-local-dir/usercache/user/appcache/application_1426230650260_1044/blockmgr-453473e7-76c2-4a94-85d0-d0b75b515ad6/10/shuffle_0_264_0.index (No such file or directory) This file actually exists: ls -l /hadoop/tmp/hadoop-hadoop/nm-local-dir/usercache/user/appcache/application_1426230650260_1044/blockmgr-453473e7-76c2-4a94-85d0-d0b75b515ad6/10/shuffle_0_264_0.index -rw-r--r-- 1 hadoop hadoop 11936 May 15 16:52 /hadoop/tmp/hadoop-hadoop/nm-local-dir/usercache/user/appcache/application_1426230650260_1044/blockmgr-453473e7-76c2-4a94-85d0-d0b75b515ad6/10/shuffle_0_264_0.index This error repeats on several executors and is followed by a number of org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0 This results on most tasks being lost and executors dying. There is plenty of space on all of the appropriate filesystems, so none of the executors are running out of disk space. Any idea what might be causing this? I am running this via YARN on approximately 100 nodes with 2 cores per node. Any thoughts on what might be causing these errors? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/FetchFailedException-and-MetadataFetchFailedException-tp22901.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to reshape RDD/Spark DataFrame
Hi all, I am a student trying to learn Spark and I had a question regarding converting rows to columns (data pivot/reshape). I have some data in the following format (either RDD or Spark DataFrame): from pyspark.sql import SQLContext sqlContext = SQLContext(sc) rdd = sc.parallelize([('X01',41,'US',3), ('X01',41,'UK',1), ('X01',41,'CA',2), ('X01',41,'US',4), ('X02',72,'UK',6), ('X02',72,'CA',7), ('X02',72,'XX',8)]) # convert to a Spark DataFrame schema = StructType([StructField('ID', StringType(), True), StructField('Age', IntegerType(), True), StructField('Country', StringType(), True), StructField('Score', IntegerType(), True)]) df = sqlContext.createDataFrame(rdd, schema) What I would like to do is to 'reshape' the data, convert certain rows in Country(specifically US, UK and CA) into columns: IDAge US UK CA 'X01' 41 312 'X02' 72 467 Essentially, I need something along the lines of Python's `pivot` workflow: categories = ['US', 'UK', 'CA'] new_df = df[df['Country'].isin(categories)].pivot(index = 'ID', columns = 'Country', values = 'Score') My dataset is rather large so I can't really `collect()` and ingest the data into memory to do the reshaping in Python itself. Is there a way to convert Python's `.pivot()` into an invokable function while mapping either an RDD or a Spark DataFrame? Any help would be appreciated! I had initially posted this question on Stack Overflow here http://stackoverflow.com/questions/30260015/reshaping-pivoting-data-in-spark-rdd-and-or-spark-dataframes but the one suggestion solution is verbose and error prone and probably not scalable either. Any help would be greatly appreciated. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-reshape-RDD-Spark-DataFrame-tp22909.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Spark's Guava pieces cause exceptions in non-trivial deployments
For me it wouldn’t help I guess, because those newer classes would still be loaded by different classloader. What did work for me with 1.3.1 – removing of those classes from Spark’s jar completely, so they get loaded from external Guava (the version I prefer) and by the classloader I expect. That’s why I proposed to put them into separate maven artifact where they could be just excluded in the build of the app that depends on Spark. From: Marcelo Vanzin [mailto:van...@cloudera.com] Sent: Friday, May 15, 2015 11:55 AM To: Anton Brazhnyk Cc: user@spark.apache.org Subject: Re: Spark's Guava pieces cause exceptions in non-trivial deployments Ah, I see. yeah, it sucks that Spark has to expose Optional (and things it depends on), but removing that would break the public API, so... One last thing you could try is do add your newer Guava jar to spark.driver.extraClassPath and spark.executor.extraClassPath. Those settings will place your jars before Spark's in the classpath, so you'd actually be using the newer versions of the conflicting classes everywhere. It does require manually distributing the Guava jar to the same location on all nodes in the cluster, though. If that doesn't work. Thomas's suggestion of shading Guava in your app can be used as a last resort. On Thu, May 14, 2015 at 7:38 PM, Anton Brazhnyk anton.brazh...@genesys.commailto:anton.brazh...@genesys.com wrote: The problem is with 1.3.1 It has Function class (mentioned in exception) in spark-network-common_2.10-1.3.1.jar. Our current resolution is actually backport to 1.2.2, which is working fine. From: Marcelo Vanzin [mailto:van...@cloudera.commailto:van...@cloudera.com] Sent: Thursday, May 14, 2015 6:27 PM To: Anton Brazhnyk Cc: user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Spark's Guava pieces cause exceptions in non-trivial deployments What version of Spark are you using? The bug you mention is only about the Optional class (and a handful of others, but none of the classes you're having problems with). All other Guava classes should be shaded since Spark 1.2, so you should be able to use your own version of Guava with no problems (aside from the Optional classes). Also, Spark 1.3 added some improvements to how shading is done, so if you're using 1.2 I'd recommend trying 1.3 before declaring defeat. On Thu, May 14, 2015 at 4:52 PM, Anton Brazhnyk anton.brazh...@genesys.commailto:anton.brazh...@genesys.com wrote: Greetings, I have a relatively complex application with Spark, Jetty and Guava (16) not fitting together. Exception happens when some components try to use “mix” of Guava classes (including Spark’s pieces) that are loaded by different classloaders: java.lang.LinkageError: loader constraint violation: when resolving method com.google.common.collect.Iterables.transform(Ljava/lang/Iterable;Lcom/google/common/base/Function;)Ljava/lang/Iterable; the class loader (instance of org/eclipse/jetty/webapp/WebAppClassLoader) of the current class, org/apache/cassandra/db/ColumnFamilyStore, and the class loader (instance of java/net/URLClassLoader) for resolved class, com/google/common/collect/Iterables, have different Class objects for the type e;Lcom/google/common/base/Function;)Ljava/lang/Iterable; used in the signature According to https://issues.apache.org/jira/browse/SPARK-4819 it’s not going to be fixed at least until Spark 2.0, but maybe some workaround is possible? Those classes are pretty simple and have low chances to be changed in Guava significantly, so any “external” Guava can provide them. So, could such problems be fixed if those Spark’s pieces of Guava would be in separate jar and could be excluded from the mix (substituted by “external” Guava)? Thanks, Anton -- Marcelo -- Marcelo
[spark sql] $ and === can't be recognised in IntelliJ
Hi all, I wanted to join the data frame based on spark sql in IntelliJ, and wrote these code lines as below: df1.as('first).join(df2.as('second), $first._1 === $second._1) IntelliJ reported the error for $ and === in red colour. I found $ and === are defined as implicit conversion in org.apache.spark.sql.catalyst.dsl.ExpressionConversions and org.apache.spark.sql.catalyst.dsl.ImplicitOperators, so I was trying to import them to solve the issue, however, it doesn't work. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-and-can-t-be-recognised-in-IntelliJ-tp22910.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Why association with remote system has failed when set master in Spark programmatically
Hi all, I run start-master.sh to start standalone Spark with spark://192.168.1.164:7077. Then, I use this command as below, and it's OK: ./bin/spark-shell --master spark://192.168.1.164:7077 The console print correct message, and Spark context had been initialised correctly. However, when I run app in IntelliJ Idea using spark conf like this: val sparkConf = new SparkConf().setAppName(FromMySql) .setMaster(spark://192.168.1.164:7077) .set(spark.akka.heartbeat.interval, 100) val sc = new SparkContext(sparkConf) val sqlContext = new SQLContext(sc) It can't talk to spark and print these error messages: ReliableDeliverySupervisor: Association with remote system [akka.tcp://sparkMaster@192.168.1.164:7077] has failed, address is now gated for [5000] ms. Reason is: [Disassociated]. If I changed the conf to local[*], it's ok. After I packaged my app then use spark-submit command, the communication between local and remote actor is OK. It's very strange! Then I debugged it, and the remote actor can be fetched correctly in the tryRegisterAllMasters() method of AppClient: def tryRegisterAllMasters() { for (masterAkkaUrl - masterAkkaUrls) { logInfo(Connecting to master + masterAkkaUrl + ...) val actor = context.actorSelection(masterAkkaUrl) actor ! RegisterApplication(appDescription) } } After actor send the RegisterApplication message, it seems like the message is not routed to the remote actor, so registering operation is not finished, then failed. I don't know what is the reason. Who know the answer? Regards, Yi -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Why-association-with-remote-system-has-failed-when-set-master-in-Spark-programmatically-tp22911.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark sql and csv data processing question
Hi Im getting the following error when trying to process a csv based data file. Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 10.0 failed 4 times, most recent failure: Lost task 1.3 in stage 10.0 (TID 262, hc2r1m3.semtech-solutions.co.nz): java.lang.ArrayIndexOutOfBoundsException: 0 at org.apache.spark.sql.catalyst.expressions.GenericRow.apply(Row.scala:142) at org.apache.spark.sql.catalyst.expressions.BoundReference.eval(BoundAttribute.scala:37) at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:68) at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:52) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$10.next(Iterator.scala:312) I have made sure that none of my data rows are empty and that they all have 15 records. I have also physically checked the data. The error occurs when I run the actual spark sql on the last line. The script is as follows. val server= hdfs://hc2nn.semtech-solutions.co.nz:8020 val path = /data/spark/h2o/ val train_csv = server + path + adult.train.data // 32,562 rows val test_csv = server + path + adult.test.data // 16,283 rows // load the data val rawTrainData = sparkCxt.textFile(train_csv) val rawTestData = sparkCxt.textFile(test_csv) // create a spark sql schema for the row val schemaString = age workclass fnlwgt education educationalnum maritalstatus + occupation relationship race gender capitalgain capitalloss + hoursperweek nativecountry income val schema = StructType( schemaString.split( ) .map(fieldName = StructField(fieldName, StringType, false))) // create an RDD from the raw training data val trainRDD = rawTrainData .filter(!_.isEmpty) .map(rawRow = Row.fromSeq(rawRow.split(,) .filter(_.length == 15) .map(_.toString).map(_.trim) )) println( Raw Training Data Count = + trainRDD.count() ) val testRDD = rawTestData .filter(!_.isEmpty) .map(rawRow = Row.fromSeq(rawRow.split(,) .filter(_.length == 15) .map(_.toString).map(_.trim) )) println( Raw Testing Data Count = + testRDD.count() ) // create a schema RDD val trainSchemaRDD = sqlContext.applySchema(trainRDD, schema) val testSchemaRDD = sqlContext.applySchema(testRDD, schema) // register schema RDD as a table trainSchemaRDD.registerTempTable(trainingTable) testSchemaRDD.registerTempTable(testingTable) println( Schema RDD Training Data Count = + trainSchemaRDD.count() ) println( Schema RDD Testing Data Count = + testSchemaRDD.count() ) // now run sql against the table to filter the data val schemaRddTrain = sqlContext.sql( SELECT + age,workclass,education,maritalstatus,occupation,relationship,race,+ gender,hoursperweek,nativecountry,income + FROM trainingTable LIMIT 5000) println( Training Data Count = + schemaRddTrain.count() ) Any advice is appreciated :)
Re: Broadcast variables can be rebroadcast?
Hi broadcast variables are shipped for the first time it is accessed in a transformation to the executors used by the transformation. It will NOT updated subsequently, even if the value has changed. However, a new value will be shipped to any new executor comes into play after the value has changed. This way, changing value of broadcast variable is not a good idea as it can create inconsistency within cluster. From documentatins: In addition, the object v should not be modified after it is broadcast in order to ensure that all nodes get the same value of the broadcast variable On Sat, May 16, 2015 at 10:39 AM, N B nb.nos...@gmail.com wrote: Thanks Ilya. Does one have to call broadcast again once the underlying data is updated in order to get the changes visible on all nodes? Thanks NB On Fri, May 15, 2015 at 5:29 PM, Ilya Ganelin ilgan...@gmail.com wrote: The broadcast variable is like a pointer. If the underlying data changes then the changes will be visible throughout the cluster. On Fri, May 15, 2015 at 5:18 PM NB nb.nos...@gmail.com wrote: Hello, Once a broadcast variable is created using sparkContext.broadcast(), can it ever be updated again? The use case is for something like the underlying lookup data changing over time. Thanks NB -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Broadcast-variables-can-be-rebroadcast-tp22908.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Best Regards, Ayan Guha
[spark sql] $ and === can't be recognised in IntelliJ
Hi all, I wanted to join the data frame based on spark sql in IntelliJ, and wrote these code lines as below:df1.as('first).join(df2.as('second), $first._1 === $second._1) IntelliJ reported the error for $ and === in red colour. I found $ and === are defined as implicit conversion in org.apache.spark.sql.catalyst.dsl.ExpressionConversions and org.apache.spark.sql.catalyst.dsl.ImplicitOperators, so I was trying to import them to solve the issue, however, it doesn't work.
RE: Spark Fair Scheduler for Spark Streaming - 1.2 and beyond
No pools for the moment – for each of the apps using the straightforward way with the spark conf param for scheduling = FAIR Spark is running in a Standalone Mode Are you saying that Configuring Pools is mandatory to get the FAIR scheduling working – from the docs it seemed optional to me From: Tathagata Das [mailto:t...@databricks.com] Sent: Friday, May 15, 2015 6:45 PM To: Evo Eftimov Cc: user Subject: Re: Spark Fair Scheduler for Spark Streaming - 1.2 and beyond How are you configuring the fair scheduler pools? On Fri, May 15, 2015 at 8:33 AM, Evo Eftimov evo.efti...@isecc.com wrote: I have run / submitted a few Spark Streaming apps configured with Fair scheduling on Spark Streaming 1.2.0, however they still run in a FIFO mode. Is FAIR scheduling supported at all for Spark Streaming apps and from what release / version - e.g. 1.3.1 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Fair-Scheduler-for-Spark-Streaming-1-2-and-beyond-tp22902.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Spark Fair Scheduler for Spark Streaming - 1.2 and beyond
Ok thanks a lot for clarifying that – btw was your application a Spark Streaming App – I am also looking for confirmation that FAIR scheduling is supported for Spark Streaming Apps From: Richard Marscher [mailto:rmarsc...@localytics.com] Sent: Friday, May 15, 2015 7:20 PM To: Evo Eftimov Cc: Tathagata Das; user Subject: Re: Spark Fair Scheduler for Spark Streaming - 1.2 and beyond The doc is a bit confusing IMO, but at least for my application I had to use a fair pool configuration to get my stages to be scheduled with FAIR. On Fri, May 15, 2015 at 2:13 PM, Evo Eftimov evo.efti...@isecc.com wrote: No pools for the moment – for each of the apps using the straightforward way with the spark conf param for scheduling = FAIR Spark is running in a Standalone Mode Are you saying that Configuring Pools is mandatory to get the FAIR scheduling working – from the docs it seemed optional to me From: Tathagata Das [mailto:t...@databricks.com] Sent: Friday, May 15, 2015 6:45 PM To: Evo Eftimov Cc: user Subject: Re: Spark Fair Scheduler for Spark Streaming - 1.2 and beyond How are you configuring the fair scheduler pools? On Fri, May 15, 2015 at 8:33 AM, Evo Eftimov evo.efti...@isecc.com wrote: I have run / submitted a few Spark Streaming apps configured with Fair scheduling on Spark Streaming 1.2.0, however they still run in a FIFO mode. Is FAIR scheduling supported at all for Spark Streaming apps and from what release / version - e.g. 1.3.1 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Fair-Scheduler-for-Spark-Streaming-1-2-and-beyond-tp22902.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Fair Scheduler for Spark Streaming - 1.2 and beyond
The doc is a bit confusing IMO, but at least for my application I had to use a fair pool configuration to get my stages to be scheduled with FAIR. On Fri, May 15, 2015 at 2:13 PM, Evo Eftimov evo.efti...@isecc.com wrote: No pools for the moment – for each of the apps using the straightforward way with the spark conf param for scheduling = FAIR Spark is running in a Standalone Mode Are you saying that Configuring Pools is mandatory to get the FAIR scheduling working – from the docs it seemed optional to me *From:* Tathagata Das [mailto:t...@databricks.com] *Sent:* Friday, May 15, 2015 6:45 PM *To:* Evo Eftimov *Cc:* user *Subject:* Re: Spark Fair Scheduler for Spark Streaming - 1.2 and beyond How are you configuring the fair scheduler pools? On Fri, May 15, 2015 at 8:33 AM, Evo Eftimov evo.efti...@isecc.com wrote: I have run / submitted a few Spark Streaming apps configured with Fair scheduling on Spark Streaming 1.2.0, however they still run in a FIFO mode. Is FAIR scheduling supported at all for Spark Streaming apps and from what release / version - e.g. 1.3.1 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Fair-Scheduler-for-Spark-Streaming-1-2-and-beyond-tp22902.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: store hive metastore on persistent store
My point was more to how to verify that properties are picked up from the hive-site.xml file. You don't really need hive.metastore.uris if you're not running against an external metastore. I just did an experiment with warehouse.dir. My hive-site.xml looks like this: configuration property namehive.metastore.warehouse.dir/name value/home/ykadiysk/Github/warehouse_dir/value descriptionlocation of default database for the warehouse/description /property /configuration and spark-shell code: scala val hc= new org.apache.spark.sql.hive.HiveContext(sc) hc: org.apache.spark.sql.hive.HiveContext = org.apache.spark.sql.hive.HiveContext@3036c16f scala hc.sql(show tables).collect 15/05/15 14:12:57 INFO HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore 15/05/15 14:12:57 INFO ObjectStore: ObjectStore, initialize called 15/05/15 14:12:57 INFO Persistence: Property datanucleus.cache.level2 unknown - will be ignored 15/05/15 14:12:58 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies) 15/05/15 14:12:58 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies) 15/05/15 14:13:03 INFO ObjectStore: Setting MetaStore object pin classes with hive.metastore.cache.pinobjtypes=Table,StorageDescriptor,SerDeInfo,Partition,Database,Type,FieldSchema,Order 15/05/15 14:13:03 INFO ObjectStore: Initialized ObjectStore 15/05/15 14:13:04 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 0.12.0-protobuf-2.5 15/05/15 14:13:05 INFO HiveMetaStore: 0: get_tables: db=default pat=.* 15/05/15 14:13:05 INFO audit: ugi=ykadiysk ip=unknown-ip-addr cmd=get_tables: db=default pat=.* 15/05/15 14:13:05 INFO Datastore: The class org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as embedded-only so does not have its own datastore table. 15/05/15 14:13:05 INFO Datastore: The class org.apache.hadoop.hive.metastore.model.MOrder is tagged as embedded-only so does not have its own datastore table. res0: Array[org.apache.spark.sql.Row] = Array() scala hc.getConf(hive.metastore.warehouse.dir) res1: String = /home/ykadiysk/Github/warehouse_dir I have not tried an HDFS path but you should be at least able to verify that the variable is being read. It might be that your value is read but is otherwise not liked... On Fri, May 15, 2015 at 2:03 PM, Tamas Jambor jambo...@gmail.com wrote: thanks for the reply. I am trying to use it without hive setup (spark-standalone), so it prints something like this: hive_ctx.sql(show tables).collect() 15/05/15 17:59:03 INFO HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore 15/05/15 17:59:03 INFO ObjectStore: ObjectStore, initialize called 15/05/15 17:59:04 INFO Persistence: Property datanucleus.cache.level2 unknown - will be ignored 15/05/15 17:59:04 INFO Persistence: Property hive.metastore.integral.jdo.pushdown unknown - will be ignored 15/05/15 17:59:04 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies) 15/05/15 17:59:05 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies) 15/05/15 17:59:08 INFO BlockManagerMasterActor: Registering block manager :42819 with 3.0 GB RAM, BlockManagerId(2, xxx, 42819) [0/1844] 15/05/15 17:59:18 INFO ObjectStore: Setting MetaStore object pin classes with hive.metastore.cache.pinobjtypes=Table,StorageDescriptor,SerDeInfo,Partition,Database,Type,FieldSchema,Order 15/05/15 17:59:18 INFO MetaStoreDirectSql: MySQL check failed, assuming we are not on mysql: Lexical error at line 1, column 5. Encountered: @ (64), after : . 15/05/15 17:59:20 INFO Datastore: The class org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as embedded-only so does not have its own datastore table. 15/05/15 17:59:20 INFO Datastore: The class org.apache.hadoop.hive.metastore.model.MOrder is tagged as embedded-only so does not have its own datastore table. 15/05/15 17:59:28 INFO Datastore: The class org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as embedded-only so does not have its own datastore table. 15/05/15 17:59:29 INFO Datastore: The class org.apache.hadoop.hive.metastore.model.MOrder is tagged as embedded-only so does not have its own datastore table. 15/05/15 17:59:31 INFO ObjectStore: Initialized ObjectStore 15/05/15 17:59:32 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 0.13.1aa 15/05/15 17:59:33 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-azure-file-system.properties,hadoop-metrics2.properties 15/05/15 17:59:33 INFO MetricsSystemImpl: Scheduled snapshot period at 10 second(s). 15/05/15 17:59:33 INFO MetricsSystemImpl:
Re: how to use rdd.countApprox
Hi TD, Just let you know the job group and cancelation worked after I switched to spark 1.3.1. I set a group id for rdd.countApprox() and cancel it, then set another group id for the remaining job of the foreachRDD but let it complete. As a by-product, I use group id to indicate what the job does. :-) Thanks,Du On Wednesday, May 13, 2015 4:23 PM, Tathagata Das t...@databricks.com wrote: You might get stage information through SparkListener. But I am not sure whether you can use that information to easily kill stages. Though i highly recommend using Spark 1.3.1 (or even Spark master). Things move really fast between releases. 1.1.1 feels really old to me :P TD On Wed, May 13, 2015 at 1:25 PM, Du Li l...@yahoo-inc.com wrote: I do rdd.countApprox() and rdd.sparkContext.setJobGroup() inside dstream.foreachRDD{...}. After calling cancelJobGroup(), the spark context seems no longer valid, which crashes subsequent jobs. My spark version is 1.1.1. I will do more investigation into this issue, perhaps after upgrading to 1.3.1, and then file a JIRA if it persists. Is there a way to get stage or task id of a particular transformation or action on RDD and then selectively kill the stage or tasks? It would be necessary and useful in situations similar to countApprox. Thanks,Du On Wednesday, May 13, 2015 1:12 PM, Tathagata Das t...@databricks.com wrote: That is not supposed to happen :/ That is probably a bug.If you have the log4j logs, would be good to file a JIRA. This may be worth debugging. On Wed, May 13, 2015 at 12:13 PM, Du Li l...@yahoo-inc.com wrote: Actually I tried that before asking. However, it killed the spark context. :-) Du On Wednesday, May 13, 2015 12:02 PM, Tathagata Das t...@databricks.com wrote: That is a good question. I dont see a direct way to do that. You could do try the following val jobGroupId = group-id-based-on-current-timerdd.sparkContext.setJobGroup(jobGroupId)val approxCount = rdd.countApprox().getInitialValue // job launched with the set job grouprdd.sparkContext.cancelJobGroup(jobGroupId) // cancel the job On Wed, May 13, 2015 at 11:24 AM, Du Li l...@yahoo-inc.com wrote: Hi TD, Do you know how to cancel the rdd.countApprox(5000) tasks after the timeout? Otherwise it keeps running until completion, producing results not used but consuming resources. Thanks,Du On Wednesday, May 13, 2015 10:33 AM, Du Li l...@yahoo-inc.com.INVALID wrote: Hi TD, Thanks a lot. rdd.countApprox(5000).initialValue worked! Now my streaming app is standing a much better chance to complete processing each batch within the batch interval. Du On Tuesday, May 12, 2015 10:31 PM, Tathagata Das t...@databricks.com wrote: From the code it seems that as soon as the rdd.countApprox(5000) returns, you can call pResult.initialValue() to get the approximate count at that point of time (that is after timeout). Calling pResult.getFinalValue() will further block until the job is over, and give the final correct values that you would have received by rdd.count() On Tue, May 12, 2015 at 5:03 PM, Du Li l...@yahoo-inc.com.invalid wrote: HI, I tested the following in my streaming app and hoped to get an approximate count within 5 seconds. However, rdd.countApprox(5000).getFinalValue() seemed to always return after it finishes completely, just like rdd.count(), which often exceeded 5 seconds. The values for low, mean, and high were the same. val pResult = rdd.countApprox(5000)val bDouble = pResult.getFinalValue()logInfo(scountApprox().getFinalValue(): low=${bDouble.low.toLong}, mean=${bDouble.mean.toLong}, high=${bDouble.high.toLong}) Can any expert here help explain the right way of usage? Thanks,Du On Wednesday, May 6, 2015 7:55 AM, Du Li l...@yahoo-inc.com.INVALID wrote: I have to count RDD's in a spark streaming app. When data goes large, count() becomes expensive. Did anybody have experience using countApprox()? How accurate/reliable is it? The documentation is pretty modest. Suppose the timeout parameter is in milliseconds. Can I retrieve the count value by calling getFinalValue()? Does it block and return only after the timeout? Or do I need to define onComplete/onFail handlers to extract count value from the partial results? Thanks,Du
Re: Spark Fair Scheduler for Spark Streaming - 1.2 and beyond
It's not a Spark Streaming app, so sorry I'm not sure of the answer to that. I would assume it should work. On Fri, May 15, 2015 at 2:22 PM, Evo Eftimov evo.efti...@isecc.com wrote: Ok thanks a lot for clarifying that – btw was your application a Spark Streaming App – I am also looking for confirmation that FAIR scheduling is supported for Spark Streaming Apps *From:* Richard Marscher [mailto:rmarsc...@localytics.com] *Sent:* Friday, May 15, 2015 7:20 PM *To:* Evo Eftimov *Cc:* Tathagata Das; user *Subject:* Re: Spark Fair Scheduler for Spark Streaming - 1.2 and beyond The doc is a bit confusing IMO, but at least for my application I had to use a fair pool configuration to get my stages to be scheduled with FAIR. On Fri, May 15, 2015 at 2:13 PM, Evo Eftimov evo.efti...@isecc.com wrote: No pools for the moment – for each of the apps using the straightforward way with the spark conf param for scheduling = FAIR Spark is running in a Standalone Mode Are you saying that Configuring Pools is mandatory to get the FAIR scheduling working – from the docs it seemed optional to me *From:* Tathagata Das [mailto:t...@databricks.com] *Sent:* Friday, May 15, 2015 6:45 PM *To:* Evo Eftimov *Cc:* user *Subject:* Re: Spark Fair Scheduler for Spark Streaming - 1.2 and beyond How are you configuring the fair scheduler pools? On Fri, May 15, 2015 at 8:33 AM, Evo Eftimov evo.efti...@isecc.com wrote: I have run / submitted a few Spark Streaming apps configured with Fair scheduling on Spark Streaming 1.2.0, however they still run in a FIFO mode. Is FAIR scheduling supported at all for Spark Streaming apps and from what release / version - e.g. 1.3.1 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Fair-Scheduler-for-Spark-Streaming-1-2-and-beyond-tp22902.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Fair Scheduler for Spark Streaming - 1.2 and beyond
If you don't send jobs to different pools, then they will all end up in the default pool. If you leave the intra-pool scheduling policy as the default FIFO, then this will effectively be the same thing as using the default FIFO scheduling. Depending on what you are trying to accomplish, you need some combination of multiple pools and FAIR scheduling within one or more pools. And. of course, you need to actually place a job within an appropriate pool. On Fri, May 15, 2015 at 11:13 AM, Evo Eftimov evo.efti...@isecc.com wrote: No pools for the moment – for each of the apps using the straightforward way with the spark conf param for scheduling = FAIR Spark is running in a Standalone Mode Are you saying that Configuring Pools is mandatory to get the FAIR scheduling working – from the docs it seemed optional to me *From:* Tathagata Das [mailto:t...@databricks.com] *Sent:* Friday, May 15, 2015 6:45 PM *To:* Evo Eftimov *Cc:* user *Subject:* Re: Spark Fair Scheduler for Spark Streaming - 1.2 and beyond How are you configuring the fair scheduler pools? On Fri, May 15, 2015 at 8:33 AM, Evo Eftimov evo.efti...@isecc.com wrote: I have run / submitted a few Spark Streaming apps configured with Fair scheduling on Spark Streaming 1.2.0, however they still run in a FIFO mode. Is FAIR scheduling supported at all for Spark Streaming apps and from what release / version - e.g. 1.3.1 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Fair-Scheduler-for-Spark-Streaming-1-2-and-beyond-tp22902.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: store hive metastore on persistent store
thanks for the reply. I am trying to use it without hive setup (spark-standalone), so it prints something like this: hive_ctx.sql(show tables).collect() 15/05/15 17:59:03 INFO HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore 15/05/15 17:59:03 INFO ObjectStore: ObjectStore, initialize called 15/05/15 17:59:04 INFO Persistence: Property datanucleus.cache.level2 unknown - will be ignored 15/05/15 17:59:04 INFO Persistence: Property hive.metastore.integral.jdo.pushdown unknown - will be ignored 15/05/15 17:59:04 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies) 15/05/15 17:59:05 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies) 15/05/15 17:59:08 INFO BlockManagerMasterActor: Registering block manager :42819 with 3.0 GB RAM, BlockManagerId(2, xxx, 42819) [0/1844] 15/05/15 17:59:18 INFO ObjectStore: Setting MetaStore object pin classes with hive.metastore.cache.pinobjtypes=Table,StorageDescriptor,SerDeInfo,Partition,Database,Type,FieldSchema,Order 15/05/15 17:59:18 INFO MetaStoreDirectSql: MySQL check failed, assuming we are not on mysql: Lexical error at line 1, column 5. Encountered: @ (64), after : . 15/05/15 17:59:20 INFO Datastore: The class org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as embedded-only so does not have its own datastore table. 15/05/15 17:59:20 INFO Datastore: The class org.apache.hadoop.hive.metastore.model.MOrder is tagged as embedded-only so does not have its own datastore table. 15/05/15 17:59:28 INFO Datastore: The class org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as embedded-only so does not have its own datastore table. 15/05/15 17:59:29 INFO Datastore: The class org.apache.hadoop.hive.metastore.model.MOrder is tagged as embedded-only so does not have its own datastore table. 15/05/15 17:59:31 INFO ObjectStore: Initialized ObjectStore 15/05/15 17:59:32 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 0.13.1aa 15/05/15 17:59:33 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-azure-file-system.properties,hadoop-metrics2.properties 15/05/15 17:59:33 INFO MetricsSystemImpl: Scheduled snapshot period at 10 second(s). 15/05/15 17:59:33 INFO MetricsSystemImpl: azure-file-system metrics system started 15/05/15 17:59:33 INFO HiveMetaStore: Added admin role in metastore 15/05/15 17:59:34 INFO HiveMetaStore: Added public role in metastore 15/05/15 17:59:34 INFO HiveMetaStore: No user is added in admin role, since config is empty 15/05/15 17:59:35 INFO SessionState: No Tez session required at this point. hive.execution.engine=mr. 15/05/15 17:59:37 INFO HiveMetaStore: 0: get_tables: db=default pat=.* 15/05/15 17:59:37 INFO audit: ugi=testuser ip=unknown-ip-addr cmd=get_tables: db=default pat=.* not sure what to put in hive.metastore.uris in this case? On Fri, May 15, 2015 at 2:52 PM, Yana Kadiyska yana.kadiy...@gmail.com wrote: This should work. Which version of Spark are you using? Here is what I do -- make sure hive-site.xml is in the conf directory of the machine you're using the driver from. Now let's run spark-shell from that machine: scala val hc= new org.apache.spark.sql.hive.HiveContext(sc) hc: org.apache.spark.sql.hive.HiveContext = org.apache.spark.sql.hive.HiveContext@6e9f8f26 scala hc.sql(show tables).collect 15/05/15 09:34:17 INFO metastore: Trying to connect to metastore with URI thrift://hostname.com:9083 -- here should be a value from your hive-site.xml 15/05/15 09:34:17 INFO metastore: Waiting 1 seconds before next connection attempt. 15/05/15 09:34:18 INFO metastore: Connected to metastore. res0: Array[org.apache.spark.sql.Row] = Array([table1,false], scala hc.getConf(hive.metastore.uris) res13: String = thrift://hostname.com:9083 scala hc.getConf(hive.metastore.warehouse.dir) res14: String = /user/hive/warehouse The first line tells you which metastore it's trying to connect to -- this should be the string specified under hive.metastore.uris property in your hive-site.xml file. I have not mucked with warehouse.dir too much but I know that the value of the metastore URI is in fact picked up from there as I regularly point to different systems... On Thu, May 14, 2015 at 6:26 PM, Tamas Jambor jambo...@gmail.com wrote: I have tried to put the hive-site.xml file in the conf/ directory with, seems it is not picking up from there. On Thu, May 14, 2015 at 6:50 PM, Michael Armbrust mich...@databricks.com wrote: You can configure Spark SQLs hive interaction by placing a hive-site.xml file in the conf/ directory. On Thu, May 14, 2015 at 10:24 AM, jamborta jambo...@gmail.com wrote: Hi all, is it possible to set hive.metastore.warehouse.dir, that is internally create by spark, to be stored externally (e.g. s3 on aws or wasb
Re: spark log field clarification
anybody shed some light for me? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-log-field-clarification-tp22892p22904.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Forbidded : Error Code: 403
On 15 May 2015, at 21:20, Mohammad Tariq donta...@gmail.com wrote: Thank you Ayan and Ted for the prompt response. It isn't working with s3n either. And I am able to download the file. In fact I am able to read the same file using s3 API without any issue. sounds like an S3n config problem. Check your configurations - you can test locally via the hdfs dfs command without even starting spark Oh, and if there is a / in your secret key, you're going to to need to generate new one. Long story - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Forbidded : Error Code: 403
Thanks for the suggestion Steve. I'll try that out. Read the long story last night while struggling with this :). I made sure that I don't have any '/' in my key. On Saturday, May 16, 2015, Steve Loughran ste...@hortonworks.com wrote: On 15 May 2015, at 21:20, Mohammad Tariq donta...@gmail.com javascript:; wrote: Thank you Ayan and Ted for the prompt response. It isn't working with s3n either. And I am able to download the file. In fact I am able to read the same file using s3 API without any issue. sounds like an S3n config problem. Check your configurations - you can test locally via the hdfs dfs command without even starting spark Oh, and if there is a / in your secret key, you're going to to need to generate new one. Long story -- [image: http://] Tariq, Mohammad about.me/mti [image: http://] http://about.me/mti
Re: Spark's Guava pieces cause exceptions in non-trivial deployments
This is still a problem in 1.3. Optional is both used in several shaded classes within Guava (e.g. the Immutable* classes) and itself uses shaded classes (e.g. AbstractIterator). This causes problems in application code. The only reliable way we've found around this is to shade Guava ourselves for application code and thus avoid the problem altogether. cheers, Tom On Thu, May 14, 2015 at 7:38 PM, Anton Brazhnyk anton.brazh...@genesys.com wrote: The problem is with 1.3.1 It has Function class (mentioned in exception) in spark-network-common_2.10-1.3.1.jar. Our current resolution is actually backport to 1.2.2, which is working fine. *From:* Marcelo Vanzin [mailto:van...@cloudera.com] *Sent:* Thursday, May 14, 2015 6:27 PM *To:* Anton Brazhnyk *Cc:* user@spark.apache.org *Subject:* Re: Spark's Guava pieces cause exceptions in non-trivial deployments What version of Spark are you using? The bug you mention is only about the Optional class (and a handful of others, but none of the classes you're having problems with). All other Guava classes should be shaded since Spark 1.2, so you should be able to use your own version of Guava with no problems (aside from the Optional classes). Also, Spark 1.3 added some improvements to how shading is done, so if you're using 1.2 I'd recommend trying 1.3 before declaring defeat. On Thu, May 14, 2015 at 4:52 PM, Anton Brazhnyk anton.brazh...@genesys.com wrote: Greetings, I have a relatively complex application with Spark, Jetty and Guava (16) not fitting together. Exception happens when some components try to use “mix” of Guava classes (including Spark’s pieces) that are loaded by different classloaders: java.lang.LinkageError: loader constraint violation: when resolving method com.google.common.collect.Iterables.transform(Ljava/lang/Iterable;Lcom/google/common/base/Function;)Ljava/lang/Iterable; the class loader (instance of org/eclipse/jetty/webapp/WebAppClassLoader) of the current class, org/apache/cassandra/db/ColumnFamilyStore, and the class loader (instance of java/net/URLClassLoader) for resolved class, com/google/common/collect/Iterables, have different Class objects for the type e;Lcom/google/common/base/Function;)Ljava/lang/Iterable; used in the signature According to https://issues.apache.org/jira/browse/SPARK-4819 it’s not going to be fixed at least until Spark 2.0, but maybe some workaround is possible? Those classes are pretty simple and have low chances to be changed in Guava significantly, so any “external” Guava can provide them. So, could such problems be fixed if those Spark’s pieces of Guava would be in separate jar and could be excluded from the mix (substituted by “external” Guava)? Thanks, Anton -- Marcelo
Re: Problem with current spark
Could your provide the full driver log? Looks like a bug. Thank you! Best Regards, Shixiong Zhu 2015-05-13 14:02 GMT-07:00 Giovanni Paolo Gibilisco gibb...@gmail.com: Hi, I'm trying to run an application that uses a Hive context to perform some queries over JSON files. The code of the application is here: https://github.com/GiovanniPaoloGibilisco/spark-log-processor/tree/fca93d95a227172baca58d51a4d799594a0429a1 I can run it on Spark 1.3.1 after rebuilding it with hive support using: mvn -Phive -Phive-thriftserver -DskipTests clean package but when I try to run the same application on the one built fromt he current master branch (at this commit of today https://github.com/apache/spark/tree/bec938f777a2e18757c7d04504d86a5342e2b49e) again built with hive support I get an error at Stage 2 that is not submitted, and after a while the application is killed. The logs look like this: 15/05/13 16:54:37 INFO SparkContext: Starting job: run at unknown:0 15/05/13 16:54:37 INFO DAGScheduler: Got job 2 (run at unknown:0) with 2 output partitions (allowLocal=false) 15/05/13 16:54:37 INFO DAGScheduler: Final stage: ResultStage 4(run at unknown:0) 15/05/13 16:54:37 INFO DAGScheduler: Parents of final stage: List() 15/05/13 16:54:37 INFO Exchange: Using SparkSqlSerializer2. 15/05/13 16:54:37 INFO SparkContext: Starting job: run at unknown:0 15/05/13 16:54:37 INFO SparkContext: Starting job: run at unknown:0 15/05/13 16:54:37 INFO SparkContext: Starting job: run at unknown:0 ^C15/05/13 16:54:42 INFO SparkContext: Invoking stop() from shutdown hook 15/05/13 16:54:42 INFO SparkUI: Stopped Spark web UI at http://192.168.230.130:4040 15/05/13 16:54:42 INFO DAGScheduler: Stopping DAGScheduler 15/05/13 16:54:42 INFO SparkDeploySchedulerBackend: Shutting down all executors 15/05/13 16:54:42 INFO SparkDeploySchedulerBackend: Asking each executor to shut down 15/05/13 16:54:52 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 15/05/13 16:54:52 ERROR TaskSchedulerImpl: Lost executor 0 on 192.168.230.130: remote Rpc client disassociated 15/05/13 16:54:53 INFO AppClient$ClientActor: Executor updated: app-20150513165402-/0 is now EXITED (Command exited with code 0) 15/05/13 16:54:53 INFO SparkDeploySchedulerBackend: Executor app-20150513165402-/0 removed: Command exited with code 0 15/05/13 16:54:53 ERROR SparkDeploySchedulerBackend: Asked to remove non-existent executor 0 15/05/13 16:56:42 WARN AkkaRpcEndpointRef: Error sending message [message = StopExecutors] in 1 attempts java.util.concurrent.TimeoutException: Futures timed out after [120 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102) at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:78) at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.stopExecutors(CoarseGrainedSchedulerBackend.scala:257) at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.stop(CoarseGrainedSchedulerBackend.scala:266) at org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend.stop(SparkDeploySchedulerBackend.scala:95) at org.apache.spark.scheduler.TaskSchedulerImpl.stop(TaskSchedulerImpl.scala:416) at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1404) at org.apache.spark.SparkContext.stop(SparkContext.scala:1562) at org.apache.spark.SparkContext$$anonfun$3.apply$mcV$sp(SparkContext.scala:551) at org.apache.spark.util.SparkShutdownHook.run(Utils.scala:2252) at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Utils.scala:) at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(Utils.scala:) at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(Utils.scala:) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1764) at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(Utils.scala:) at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(Utils.scala:) at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(Utils.scala:) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.util.SparkShutdownHookManager.runAll(Utils.scala:) at org.apache.spark.util.SparkShutdownHookManager$$anon$6.run(Utils.scala:2204) at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:54) Should I submit an Issue for this? What is the best way to do it? Best
Re: Spark's Guava pieces cause exceptions in non-trivial deployments
On Fri, May 15, 2015 at 11:56 AM, Thomas Dudziak tom...@gmail.com wrote: Actually the extraClassPath settings put the extra jars at the end of the classpath so they won't help. Only the deprecated SPARK_CLASSPATH puts them at the front. That's definitely not the case for YARN: https://github.com/apache/spark/blob/master/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala#L1013 And it's been like that for as far as I remember. I'm almost sure that's also the case for standalone, at least in master / 1.4, since I touched a lot of that code recently. It would be really weird if those options worked differently from SPARK_CLASSPATH, since they were meant to replace it. On Fri, May 15, 2015 at 11:54 AM, Marcelo Vanzin van...@cloudera.com wrote: Ah, I see. yeah, it sucks that Spark has to expose Optional (and things it depends on), but removing that would break the public API, so... One last thing you could try is do add your newer Guava jar to spark.driver.extraClassPath and spark.executor.extraClassPath. Those settings will place your jars before Spark's in the classpath, so you'd actually be using the newer versions of the conflicting classes everywhere. It does require manually distributing the Guava jar to the same location on all nodes in the cluster, though. If that doesn't work. Thomas's suggestion of shading Guava in your app can be used as a last resort. On Thu, May 14, 2015 at 7:38 PM, Anton Brazhnyk anton.brazh...@genesys.com wrote: The problem is with 1.3.1 It has Function class (mentioned in exception) in spark-network-common_2.10-1.3.1.jar. Our current resolution is actually backport to 1.2.2, which is working fine. *From:* Marcelo Vanzin [mailto:van...@cloudera.com] *Sent:* Thursday, May 14, 2015 6:27 PM *To:* Anton Brazhnyk *Cc:* user@spark.apache.org *Subject:* Re: Spark's Guava pieces cause exceptions in non-trivial deployments What version of Spark are you using? The bug you mention is only about the Optional class (and a handful of others, but none of the classes you're having problems with). All other Guava classes should be shaded since Spark 1.2, so you should be able to use your own version of Guava with no problems (aside from the Optional classes). Also, Spark 1.3 added some improvements to how shading is done, so if you're using 1.2 I'd recommend trying 1.3 before declaring defeat. On Thu, May 14, 2015 at 4:52 PM, Anton Brazhnyk anton.brazh...@genesys.com wrote: Greetings, I have a relatively complex application with Spark, Jetty and Guava (16) not fitting together. Exception happens when some components try to use “mix” of Guava classes (including Spark’s pieces) that are loaded by different classloaders: java.lang.LinkageError: loader constraint violation: when resolving method com.google.common.collect.Iterables.transform(Ljava/lang/Iterable;Lcom/google/common/base/Function;)Ljava/lang/Iterable; the class loader (instance of org/eclipse/jetty/webapp/WebAppClassLoader) of the current class, org/apache/cassandra/db/ColumnFamilyStore, and the class loader (instance of java/net/URLClassLoader) for resolved class, com/google/common/collect/Iterables, have different Class objects for the type e;Lcom/google/common/base/Function;)Ljava/lang/Iterable; used in the signature According to https://issues.apache.org/jira/browse/SPARK-4819 it’s not going to be fixed at least until Spark 2.0, but maybe some workaround is possible? Those classes are pretty simple and have low chances to be changed in Guava significantly, so any “external” Guava can provide them. So, could such problems be fixed if those Spark’s pieces of Guava would be in separate jar and could be excluded from the mix (substituted by “external” Guava)? Thanks, Anton -- Marcelo -- Marcelo -- Marcelo
Re: Spark's Guava pieces cause exceptions in non-trivial deployments
Actually the extraClassPath settings put the extra jars at the end of the classpath so they won't help. Only the deprecated SPARK_CLASSPATH puts them at the front. cheers, Tom On Fri, May 15, 2015 at 11:54 AM, Marcelo Vanzin van...@cloudera.com wrote: Ah, I see. yeah, it sucks that Spark has to expose Optional (and things it depends on), but removing that would break the public API, so... One last thing you could try is do add your newer Guava jar to spark.driver.extraClassPath and spark.executor.extraClassPath. Those settings will place your jars before Spark's in the classpath, so you'd actually be using the newer versions of the conflicting classes everywhere. It does require manually distributing the Guava jar to the same location on all nodes in the cluster, though. If that doesn't work. Thomas's suggestion of shading Guava in your app can be used as a last resort. On Thu, May 14, 2015 at 7:38 PM, Anton Brazhnyk anton.brazh...@genesys.com wrote: The problem is with 1.3.1 It has Function class (mentioned in exception) in spark-network-common_2.10-1.3.1.jar. Our current resolution is actually backport to 1.2.2, which is working fine. *From:* Marcelo Vanzin [mailto:van...@cloudera.com] *Sent:* Thursday, May 14, 2015 6:27 PM *To:* Anton Brazhnyk *Cc:* user@spark.apache.org *Subject:* Re: Spark's Guava pieces cause exceptions in non-trivial deployments What version of Spark are you using? The bug you mention is only about the Optional class (and a handful of others, but none of the classes you're having problems with). All other Guava classes should be shaded since Spark 1.2, so you should be able to use your own version of Guava with no problems (aside from the Optional classes). Also, Spark 1.3 added some improvements to how shading is done, so if you're using 1.2 I'd recommend trying 1.3 before declaring defeat. On Thu, May 14, 2015 at 4:52 PM, Anton Brazhnyk anton.brazh...@genesys.com wrote: Greetings, I have a relatively complex application with Spark, Jetty and Guava (16) not fitting together. Exception happens when some components try to use “mix” of Guava classes (including Spark’s pieces) that are loaded by different classloaders: java.lang.LinkageError: loader constraint violation: when resolving method com.google.common.collect.Iterables.transform(Ljava/lang/Iterable;Lcom/google/common/base/Function;)Ljava/lang/Iterable; the class loader (instance of org/eclipse/jetty/webapp/WebAppClassLoader) of the current class, org/apache/cassandra/db/ColumnFamilyStore, and the class loader (instance of java/net/URLClassLoader) for resolved class, com/google/common/collect/Iterables, have different Class objects for the type e;Lcom/google/common/base/Function;)Ljava/lang/Iterable; used in the signature According to https://issues.apache.org/jira/browse/SPARK-4819 it’s not going to be fixed at least until Spark 2.0, but maybe some workaround is possible? Those classes are pretty simple and have low chances to be changed in Guava significantly, so any “external” Guava can provide them. So, could such problems be fixed if those Spark’s pieces of Guava would be in separate jar and could be excluded from the mix (substituted by “external” Guava)? Thanks, Anton -- Marcelo -- Marcelo
Re: SaveAsTextFile brings down data nodes with IO Exceptions
Hey, Did you find any solution for this issue, we are seeing similar logs in our Data node logs. Appreciate any help. 2015-05-15 10:51:43,615 ERROR org.apache.hadoop.hdfs.server.datanode.DataNode: NttUpgradeDN1:50010:DataXceiver error processing WRITE_BLOCK operation src: /192.168.112.190:46253 dst: /192.168.151.104:50010 java.net.SocketTimeoutException: 6 millis timeout while waiting for channel to be ready for read. ch : java.nio.channels.SocketChannel[connected local=/192.168.151.104:50010 remote=/192.168.112.190:46253] at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:164) at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161) at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131) at java.io.BufferedInputStream.fill(Unknown Source) at java.io.BufferedInputStream.read1(Unknown Source) at java.io.BufferedInputStream.read(Unknown Source) at java.io.DataInputStream.read(Unknown Source) at org.apache.hadoop.io.IOUtils.readFully(IOUtils.java:192) at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doReadFully(PacketReceiver.java:213) at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:134) at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:109) at org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receivePacket(BlockReceiver.java:446) at org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receiveBlock(BlockReceiver.java:702) at org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:742) at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opWriteBlock(Receiver.java:124) at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.processOp(Receiver.java:71) at org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:232) at java.lang.Thread.run(Unknown Source) Thanks Puneet On Wed, Dec 3, 2014 at 2:50 AM, Ganelin, Ilya ilya.gane...@capitalone.com wrote: Hi all, as the last stage of execution, I am writing out a dataset to disk. Before I do this, I force the DAG to resolve so this is the only job left in the pipeline. The dataset in question is not especially large (a few gigabytes). During this step however, HDFS will inevitable crash. I will lose connection to data-nodes and get stuck in the loop of death – failure causes job restart, eventually causing the overall job to fail. On the data node logs I see the errors below. Does anyone have any ideas as to what is going on here? Thanks! java.io.IOException: Premature EOF from inputStream at org.apache.hadoop.io.IOUtils.readFully(IOUtils.java:194) at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doReadFully(PacketReceiver.java:213) at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:134) at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:109) at org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receivePacket(BlockReceiver.java:455) at org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receiveBlock(BlockReceiver.java:741) at org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:718) at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opWriteBlock(Receiver.java:126) at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.processOp(Receiver.java:72) at org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:225) at java.lang.Thread.run(Thread.java:745) innovationdatanode03.cof.ds.capitalone.com:1004:DataXceiver error processing WRITE_BLOCK operation src: /10.37.248.60:44676 dst: /10.37.248.59:1004 java.net.SocketTimeoutException: 65000 millis timeout while waiting for channel to be ready for read. ch : java.nio.channels.SocketChannel[connected local=/10.37.248.59:43692 remote=/10.37.248.63:1004] at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:164) at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161) at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131) at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:118) at java.io.FilterInputStream.read(FilterInputStream.java:83) at java.io.FilterInputStream.read(FilterInputStream.java:83) at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2101) at org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:660) at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opWriteBlock(Receiver.java:126) at
Re: Spark Job execution time
It does depend on the network IO within your cluster CPU usage. Said that the difference in time to run should not be huge (assumption, you are not running any other job in the cluster in parallel). -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Job-execution-time-tp22882p22903.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SaveAsTextFile brings down data nodes with IO Exceptions
I am seeing this on hadoop 2.4.0 version. Thanks for your suggestions, i will try those and let you know if they help ! On Sat, May 16, 2015 at 1:57 AM, Steve Loughran ste...@hortonworks.com wrote: What version of Hadoop are you seeing this on? On 15 May 2015, at 20:03, Puneet Kapoor puneet.cse.i...@gmail.com wrote: Hey, Did you find any solution for this issue, we are seeing similar logs in our Data node logs. Appreciate any help. 2015-05-15 10:51:43,615 ERROR org.apache.hadoop.hdfs.server.datanode.DataNode: NttUpgradeDN1:50010:DataXceiver error processing WRITE_BLOCK operation src: /192.168.112.190:46253 dst: /192.168.151.104:50010 java.net.SocketTimeoutException: 6 millis timeout while waiting for channel to be ready for read. ch : java.nio.channels.SocketChannel[connected local=/192.168.151.104:50010 remote=/192.168.112.190:46253] at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:164) at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161) at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131) at java.io.BufferedInputStream.fill(Unknown Source) at java.io.BufferedInputStream.read1(Unknown Source) at java.io.BufferedInputStream.read(Unknown Source) at java.io.DataInputStream.read(Unknown Source) at org.apache.hadoop.io.IOUtils.readFully(IOUtils.java:192) at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doReadFully(PacketReceiver.java:213) at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:134) at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:109) at org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receivePacket(BlockReceiver.java:446) at org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receiveBlock(BlockReceiver.java:702) at org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:742) at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opWriteBlock(Receiver.java:124) at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.processOp(Receiver.java:71) at org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:232) at java.lang.Thread.run(Unknown Source) That's being logged @ error level in DN. It doesn't mean the DN has crashed, only that it timed out waiting for data: something has gone wrong elsewhere. https://issues.apache.org/jira/browse/HDFS-693 there's a couple of properties you can do to extend timeouts property namedfs.socket.timeout/name value2/value /property property namedfs.datanode.socket.write.timeout/name value2/value /property You can also increase the number of data node tranceiver threads to handle data IO across the network property namedfs.datanode.max.xcievers/name value4096/value /property Yes, that property has that explicit spellinng, it's easy to get wrong
Re: Spark's Guava pieces cause exceptions in non-trivial deployments
I've just been through this exact case with shaded guava in our Mesos setup and that is how it behaves there (with Spark 1.3.1). cheers, Tom On Fri, May 15, 2015 at 12:04 PM, Marcelo Vanzin van...@cloudera.com wrote: On Fri, May 15, 2015 at 11:56 AM, Thomas Dudziak tom...@gmail.com wrote: Actually the extraClassPath settings put the extra jars at the end of the classpath so they won't help. Only the deprecated SPARK_CLASSPATH puts them at the front. That's definitely not the case for YARN: https://github.com/apache/spark/blob/master/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala#L1013 And it's been like that for as far as I remember. I'm almost sure that's also the case for standalone, at least in master / 1.4, since I touched a lot of that code recently. It would be really weird if those options worked differently from SPARK_CLASSPATH, since they were meant to replace it. On Fri, May 15, 2015 at 11:54 AM, Marcelo Vanzin van...@cloudera.com wrote: Ah, I see. yeah, it sucks that Spark has to expose Optional (and things it depends on), but removing that would break the public API, so... One last thing you could try is do add your newer Guava jar to spark.driver.extraClassPath and spark.executor.extraClassPath. Those settings will place your jars before Spark's in the classpath, so you'd actually be using the newer versions of the conflicting classes everywhere. It does require manually distributing the Guava jar to the same location on all nodes in the cluster, though. If that doesn't work. Thomas's suggestion of shading Guava in your app can be used as a last resort. On Thu, May 14, 2015 at 7:38 PM, Anton Brazhnyk anton.brazh...@genesys.com wrote: The problem is with 1.3.1 It has Function class (mentioned in exception) in spark-network-common_2.10-1.3.1.jar. Our current resolution is actually backport to 1.2.2, which is working fine. *From:* Marcelo Vanzin [mailto:van...@cloudera.com] *Sent:* Thursday, May 14, 2015 6:27 PM *To:* Anton Brazhnyk *Cc:* user@spark.apache.org *Subject:* Re: Spark's Guava pieces cause exceptions in non-trivial deployments What version of Spark are you using? The bug you mention is only about the Optional class (and a handful of others, but none of the classes you're having problems with). All other Guava classes should be shaded since Spark 1.2, so you should be able to use your own version of Guava with no problems (aside from the Optional classes). Also, Spark 1.3 added some improvements to how shading is done, so if you're using 1.2 I'd recommend trying 1.3 before declaring defeat. On Thu, May 14, 2015 at 4:52 PM, Anton Brazhnyk anton.brazh...@genesys.com wrote: Greetings, I have a relatively complex application with Spark, Jetty and Guava (16) not fitting together. Exception happens when some components try to use “mix” of Guava classes (including Spark’s pieces) that are loaded by different classloaders: java.lang.LinkageError: loader constraint violation: when resolving method com.google.common.collect.Iterables.transform(Ljava/lang/Iterable;Lcom/google/common/base/Function;)Ljava/lang/Iterable; the class loader (instance of org/eclipse/jetty/webapp/WebAppClassLoader) of the current class, org/apache/cassandra/db/ColumnFamilyStore, and the class loader (instance of java/net/URLClassLoader) for resolved class, com/google/common/collect/Iterables, have different Class objects for the type e;Lcom/google/common/base/Function;)Ljava/lang/Iterable; used in the signature According to https://issues.apache.org/jira/browse/SPARK-4819 it’s not going to be fixed at least until Spark 2.0, but maybe some workaround is possible? Those classes are pretty simple and have low chances to be changed in Guava significantly, so any “external” Guava can provide them. So, could such problems be fixed if those Spark’s pieces of Guava would be in separate jar and could be excluded from the mix (substituted by “external” Guava)? Thanks, Anton -- Marcelo -- Marcelo -- Marcelo