Re: Spark REPL question
Thanks a lot. By spins up, do you mean using the same directory, specified by following? /** Local directory to save .class files too */ val outputDir = { val tmp = System.getProperty(java.io.tmpdir) val rootDir = new SparkConf().get(spark.repl.classdir, tmp) Utils.createTempDir(rootDir) } val virtualDirectory = new PlainFile(outputDir) // directory for classfiles val classServer = new HttpServer(outputDir) /** Jetty server that will serve our classes to worker nodes */ -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Spark-REPL-question-tp6331p6333.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com.
Re: Working Formula for Hive 0.13?
The API change seems not major. I have locally change it and compiled, but not test yet. The major problem is still how to solve the hive-exec jar dependency. I am willing to help on this issue. Is it better stick to the same way as hive-0.12 until hive-exec is cleaned enough to switch back? -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Working-Formula-for-Hive-0-13-tp7551p7774.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: Working Formula for Hive 0.13?
I can compile with no error, but my patch also includes other stuff. -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Working-Formula-for-Hive-0-13-tp7551p7775.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: Working Formula for Hive 0.13?
Here is the patch. Please ignore the pom.xml related change, which just for compiling purpose. I need to further work on this one based on Wandou's previous work. -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Working-Formula-for-Hive-0-13-tp7551p7776.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: Working Formula for Hive 0.13?
Sorry, forget to upload files. I have never posted before :) hive.diff http://apache-spark-developers-list.1001551.n3.nabble.com/file/n/hive.diff -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Working-Formula-for-Hive-0-13-tp7551p.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: Working Formula for Hive 0.13?
Attached the diff the PR SPARK-2706. I am currently working on this problem. If somebody are also working on this, we can share the load. -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Working-Formula-for-Hive-0-13-tp7551p7782.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Spark testsuite error for hive 0.13.
I am trying to change spark to support hive-0.13, but always met following problem when running the test. My feeling is the test setup may need to change, but don't know exactly. Who has the similar issue or is able to shed light on it? 13:50:53.331 ERROR org.apache.hadoop.hive.ql.Driver: FAILED: SemanticException [Error 10072]: Database does not exist: default org.apache.hadoop.hive.ql.parse.SemanticException: Database does not exist: default at org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.getDatabase(BaseSemanticAnalyzer.java:1302) at org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.getDatabase(BaseSemanticAnalyzer.java:1291) at org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeCreateTable(SemanticAnalyzer.java:9944) at org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeInternal(SemanticAnalyzer.java:9180) at org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.analyze(BaseSemanticAnalyzer.java:327) at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:391) at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:291) at org.apache.hadoop.hive.ql.Driver.compileInternal(Driver.java:944) at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1009) at org.apache.hadoop.hive.ql.Driver.run(Driver.java:880) at org.apache.hadoop.hive.ql.Driver.run(Driver.java:870) at org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:292) at org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:266) at org.apache.spark.sql.hive.test.TestHiveContext.runSqlHive(TestHive.scala:83) at org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult$lzycompute(NativeCommand.scala:35) at org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult(NativeCommand.scala:35) at org.apache.spark.sql.hive.HiveContext$QueryExecution.stringResult(HiveContext.scala:405) at org.apache.spark.sql.hive.test.TestHiveContext$SqlCmd$$anonfun$cmd$1.apply$mcV$sp(TestHive.scala:164) at org.apache.spark.sql.hive.test.TestHiveContext$$anonfun$loadTestTable$2.apply(TestHive.scala:282) at org.apache.spark.sql.hive.test.TestHiveContext$$anonfun$loadTestTable$2.apply(TestHive.scala:282) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) at org.apache.spark.sql.hive.test.TestHiveContext.loadTestTable(TestHive.scala:282) at org.apache.spark.sql.hive.CachedTableSuite.init(CachedTableSuite.scala:28) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at java.lang.Class.newInstance(Class.java:374) at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:621) at sbt.ForkMain$Run$2.call(ForkMain.java:294) at sbt.ForkMain$Run$2.call(ForkMain.java:284) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.hadoop.hive.ql.parse.SemanticException: Database does not exist: default at org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.getDatabase(BaseSemanticAnalyzer.java:1298) ... 35 more -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Spark-testsuite-error-for-hive-0-13-tp7807.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: Spark testsuite error for hive 0.13.
Thanks Sean, I change both the API and version because there are some incompatibility with hive-0.13, and actually can do some basic operation with the real hive environment. But the test suite always complain with no default database message. No clue yet. -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Spark-testsuite-error-for-hive-0-13-tp7807p7810.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: Spark testsuite error for hive 0.13.
Problem solved by a walkaround with create database and use database. -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Spark-testsuite-error-for-hive-0-13-tp7807p7819.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: spark.akka.frameSize stalls job in 1.1.0
Is it because countByValue or toArray put too much stress on the driver, if there are many unique words To me it is a typical word count problem, then you can solve it as follows (correct me if I am wrong) val textFile = sc.textFile(“file) val counts = textFile.flatMap(line = line.split( )).map(word = (word, 1)).reduceByKey((a, b) = a + b) counts.saveAsTextFile(“file”)//any way you don’t want to collect results to master, and instead putting them in file. Thanks. Zhan Zhang On Aug 16, 2014, at 9:18 AM, Jerry Ye jerr...@gmail.com wrote: The job ended up running overnight with no progress. :-( On Sat, Aug 16, 2014 at 12:16 AM, Jerry Ye jerr...@gmail.com wrote: Hi Xiangrui, I actually tried branch-1.1 and master and it resulted in the job being stuck at the TaskSetManager: 14/08/16 06:55:48 INFO scheduler.TaskSchedulerImpl: Adding task set 1.0 with 2 tasks 14/08/16 06:55:48 INFO scheduler.TaskSetManager: Starting task 1.0:0 as TID 2 on executor 8: ip-10-226-199-225.us-west-2.compute.internal (PROCESS_LOCAL) 14/08/16 06:55:48 INFO scheduler.TaskSetManager: Serialized task 1.0:0 as 28055875 bytes in 162 ms 14/08/16 06:55:48 INFO scheduler.TaskSetManager: Starting task 1.0:1 as TID 3 on executor 0: ip-10-249-53-62.us-west-2.compute.internal (PROCESS_LOCAL) 14/08/16 06:55:48 INFO scheduler.TaskSetManager: Serialized task 1.0:1 as 28055875 bytes in 178 ms It's been 10 minutes with no progress on relatively small data. I'll let it run overnight and update in the morning. Is there some place that I should look to see what is happening? I tried to ssh into the executor and look at /root/spark/logs but there wasn't anything informative there. I'm sure using CountByValue works fine but my use of a HashMap is only an example. In my actual task, I'm loading a Trie data structure to perform efficient string matching between a dataset of locations and strings possibly containing mentions of locations. This seems like a common thing, to process input with a relatively memory intensive object like a Trie. I hope I'm not missing something obvious. Do you know of any example code like my use case? Thanks! - jerry -- CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: spark.akka.frameSize stalls job in 1.1.0
Not sure exactly how you use it. My understanding is that in spark it would be better to keep the overhead of driver as less as possible. Is it possible to broadcast trie to executors, do computation there and then aggregate the counters (??) in reduct phase? Thanks. Zhan Zhang On Aug 18, 2014, at 8:54 AM, Jerry Ye jerr...@gmail.com wrote: Hi Zhan, Thanks for looking into this. I'm actually using the hash map as an example of the simplest snippet of code that is failing for me. I know that this is just the word count. In my actual problem I'm using a Trie data structure to find substring matches. On Sun, Aug 17, 2014 at 11:35 PM, Zhan Zhang zzh...@hortonworks.com wrote: Is it because countByValue or toArray put too much stress on the driver, if there are many unique words To me it is a typical word count problem, then you can solve it as follows (correct me if I am wrong) val textFile = sc.textFile(“file) val counts = textFile.flatMap(line = line.split( )).map(word = (word, 1)).reduceByKey((a, b) = a + b) counts.saveAsTextFile(“file”)//any way you don’t want to collect results to master, and instead putting them in file. Thanks. Zhan Zhang On Aug 16, 2014, at 9:18 AM, Jerry Ye jerr...@gmail.com wrote: The job ended up running overnight with no progress. :-( On Sat, Aug 16, 2014 at 12:16 AM, Jerry Ye jerr...@gmail.com wrote: Hi Xiangrui, I actually tried branch-1.1 and master and it resulted in the job being stuck at the TaskSetManager: 14/08/16 06:55:48 INFO scheduler.TaskSchedulerImpl: Adding task set 1.0 with 2 tasks 14/08/16 06:55:48 INFO scheduler.TaskSetManager: Starting task 1.0:0 as TID 2 on executor 8: ip-10-226-199-225.us-west-2.compute.internal (PROCESS_LOCAL) 14/08/16 06:55:48 INFO scheduler.TaskSetManager: Serialized task 1.0:0 as 28055875 bytes in 162 ms 14/08/16 06:55:48 INFO scheduler.TaskSetManager: Starting task 1.0:1 as TID 3 on executor 0: ip-10-249-53-62.us-west-2.compute.internal (PROCESS_LOCAL) 14/08/16 06:55:48 INFO scheduler.TaskSetManager: Serialized task 1.0:1 as 28055875 bytes in 178 ms It's been 10 minutes with no progress on relatively small data. I'll let it run overnight and update in the morning. Is there some place that I should look to see what is happening? I tried to ssh into the executor and look at /root/spark/logs but there wasn't anything informative there. I'm sure using CountByValue works fine but my use of a HashMap is only an example. In my actual task, I'm loading a Trie data structure to perform efficient string matching between a dataset of locations and strings possibly containing mentions of locations. This seems like a common thing, to process input with a relatively memory intensive object like a Trie. I hope I'm not missing something obvious. Do you know of any example code like my use case? Thanks! - jerry -- CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You. -- CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You.
RE: Working Formula for Hive 0.13?
I have preliminary patch against spark1.0.2, which is attached to spark-2706. Now I am working on supporting both hive-0.12 and hive-0.13.1 with non-intrusive way (not breaking any existing hive-0.12 when introduce supporting new version). I will attach a proposal to solve multi-version support issue to spark-2706 soon. Thanks. Zhan Zhang -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Working-Formula-for-Hive-0-13-tp7551p8118.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: HiveShim not found when building in Intellij
-Phive is to enable hive-0.13.1 and -Phive -Phive-0.12.0” is to enable hive-0.12.0. Note that the thrift-server is not supported yet in hive-0.13, but expected to go to upstream soon (Spark-3720). Thanks. Zhan Zhang On Oct 28, 2014, at 9:09 PM, Stephen Boesch java...@gmail.com wrote: Thanks Patrick for the heads up. I have not been successful to discover a combination of profiles (i.e. enabling hive or hive-0.12.0 or hive-13.0) that works in Intellij with maven. Anyone who knows how to handle this - a quick note here would be appreciated. 2014-10-28 20:20 GMT-07:00 Patrick Wendell pwend...@gmail.com: Hey Stephen, In some cases in the maven build we now have pluggable source directories based on profiles using the maven build helper plug-in. This is necessary to support cross building against different Hive versions, and there will be additional instances of this due to supporting scala 2.11 and 2.10. In these cases, you may need to add source locations explicitly to intellij if you want the entire project to compile there. Unfortunately as long as we support cross-building like this, it will be an issue. Intellij's maven support does not correctly detect our use of the maven-build-plugin to add source directories. We should come up with a good set of instructions on how to import the pom files + add the few extra source directories. Off hand I am not sure exactly what the correct sequence is. - Patrick On Tue, Oct 28, 2014 at 7:57 PM, Stephen Boesch java...@gmail.com wrote: Hi Matei, Until my latest pull from upstream/master it had not been necessary to add the hive profile: is it now?? I am not using sbt gen-idea. The way to open in intellij has been to Open the parent directory. IJ recognizes it as a maven project. There are several steps to do surgery on the yarn-parent / yarn projects , then do a full rebuild. That was working until one week ago. Intellij/maven is presently broken in two ways: this hive shim (which may yet hopefully be a small/simple fix - let us see) and (2) the NoClassDefFoundError on ThreadFactoryBuilder from my prior emails -and which is quite a serious problem . 2014-10-28 19:46 GMT-07:00 Matei Zaharia matei.zaha...@gmail.com: Hi Stephen, How did you generate your Maven workspace? You need to make sure the Hive profile is enabled for it. For example sbt/sbt -Phive gen-idea. Matei On Oct 28, 2014, at 7:42 PM, Stephen Boesch java...@gmail.com wrote: I have run on the command line via maven and it is fine: mvn -Dscalastyle.failOnViolation=false -DskipTests -Pyarn -Phadoop-2.3 compile package install But with the latest code Intellij builds do not work. Following is one of 26 similar errors: Error:(173, 38) not found: value HiveShim Option(tableParameters.get(HiveShim.getStatsSetupConstTotalSize)) ^ -- CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
How spark and hive integrate in long term?
Now Spark and hive integration is a very nice feature. But I am wondering what the long term roadmap is for spark integration with hive. Both of these two projects are undergoing fast improvement and changes. Currently, my understanding is that spark hive sql part relies on hive meta store and basic parser to operate, and the thrift-server intercept hive query and replace it with its own engine. With every release of hive, there need a significant effort on spark part to support it. For the metastore part, we may possibly replace it with hcatalog. But given the dependency of other parts on hive, e.g., metastore, thriftserver, hcatlog may not be able to help much. Does anyone have any insight or idea in mind? Thanks. Zhan Zhang -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/How-spark-and-hive-integrate-in-long-term-tp9482.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: How spark and hive integrate in long term?
Thanks Dean, for the information. Hive-on-spark is nice. Spark sql has the advantage to take the full advantage of spark and allows user to manipulate the table as RDD through native spark support. When I tried to upgrade the current hive-0.13.1 support to hive-0.14.0. I found the hive parser is not compatible any more. In the meantime, those new feature introduced in hive-0.14.1, e.g, ACID, etc, is not there yet. In the meantime, spark-0.12 also has some nice feature added which is supported by thrift-server too, e.g., hive-0.13, table cache, etc. Given that both have more and more features added, it would be great if user can take advantage of both. Current, spark sql give us such benefits partially, but I am wondering how to keep such integration in long term. Thanks. Zhan Zhang On Nov 21, 2014, at 3:12 PM, Dean Wampler deanwamp...@gmail.com wrote: I can't comment on plans for Spark SQL's support for Hive, but several companies are porting Hive itself onto Spark: http://blog.cloudera.com/blog/2014/11/apache-hive-on-apache-spark-the-first-demo/ I'm not sure if they are leveraging the old Shark code base or not, but it appears to be a fresh effort. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Fri, Nov 21, 2014 at 2:51 PM, Zhan Zhang zhaz...@gmail.com wrote: Now Spark and hive integration is a very nice feature. But I am wondering what the long term roadmap is for spark integration with hive. Both of these two projects are undergoing fast improvement and changes. Currently, my understanding is that spark hive sql part relies on hive meta store and basic parser to operate, and the thrift-server intercept hive query and replace it with its own engine. With every release of hive, there need a significant effort on spark part to support it. For the metastore part, we may possibly replace it with hcatalog. But given the dependency of other parts on hive, e.g., metastore, thriftserver, hcatlog may not be able to help much. Does anyone have any insight or idea in mind? Thanks. Zhan Zhang -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/How-spark-and-hive-integrate-in-long-term-tp9482.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org -- CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: How spark and hive integrate in long term?
Thanks Cheng for the insights. Regarding the HCatalog, I did some initial investigation too and agree with you. As of now, it seems not a good solution. I will try to talk to Hive people to see whether there is such guarantee for downward compatibility for thrift protocol. By the way, I tried some basic functions using hive-0.13 connect to hive-0.14 metastore, and it looks like they are compatible. Thanks. Zhan Zhang On Nov 22, 2014, at 7:14 AM, Cheng Lian lian.cs@gmail.com wrote: Should emphasize that this is still a quick and rough conclusion, will investigate this in more detail after 1.2.0 release. Anyway we really like to provide Hive support in Spark SQL as smooth and clean as possible for both developers and end users. On 11/22/14 11:05 PM, Cheng Lian wrote: Hey Zhan, This is a great question. We are also seeking for a stable API/protocol that works with multiple Hive versions (esp. 0.12+). SPARK-4114 https://issues.apache.org/jira/browse/SPARK-4114 was opened for this. Did some research into HCatalog recently, but I must confess that I’m not an expert on HCatalog, actually spent only 1 day on exploring it. So please don’t hesitate to correct me if I was wrong about the conclusions I made below. First, although HCatalog API is more pleasant to work with, it’s unfortunately feature incomplete. It only provides a subset of most commonly used operations. For example, |HCatCreateTableDesc| maps only a subset of |CreateTableDesc|, properties like |storeAsSubDirectories|, |skewedColNames| and |skewedColValues| are missing. It’s also impossible to alter table properties via HCatalog API (Spark SQL uses this to implement the |ANALYZE| command). The |hcat| CLI tool provides all those features missing in HCatalog API via raw Metastore API, and is structurally similar to the old Hive CLI. Second, HCatalog API itself doesn’t ensure compatibility, it’s the Thrift protocol that matters. HCatalog is directly built upon raw Metastore API, and talks the same Metastore Thrift protocol. The problem we encountered in Spark SQL is that, usually we deploy Spark SQL Hive support with embedded mode (for testing) or local mode Metastore, and this makes us suffer from things like Metastore database schema changes. If Hive Metastore Thrift protocol is guaranteed to be downward compatible, then hopefully we can resort to remote mode Metastore and always depend on most recent Hive APIs. I had a glance of Thrift protocol version handling code in Hive, it seems that downward compatibility is not an issue. However I didn’t find any official documents about Thrift protocol compatibility. That said, in the future, hopefully we can only depend on most recent Hive dependencies and remove the Hive shim layer introduced in branch 1.2. For users who use exactly the same version of Hive as Spark SQL, they can use either remote or local/embedded Metastore; while for users who want to interact with existing legacy Hive clusters, they have to setup a remote Metastore and let the Thrift protocol to handle compatibility. — Cheng On 11/22/14 6:51 AM, Zhan Zhang wrote: Now Spark and hive integration is a very nice feature. But I am wondering what the long term roadmap is for spark integration with hive. Both of these two projects are undergoing fast improvement and changes. Currently, my understanding is that spark hive sql part relies on hive meta store and basic parser to operate, and the thrift-server intercept hive query and replace it with its own engine. With every release of hive, there need a significant effort on spark part to support it. For the metastore part, we may possibly replace it with hcatalog. But given the dependency of other parts on hive, e.g., metastore, thriftserver, hcatlog may not be able to help much. Does anyone have any insight or idea in mind? Thanks. Zhan Zhang -- View this message in context:http://apache-spark-developers-list.1001551.n3.nabble.com/How-spark-and-hive-integrate-in-long-term-tp9482.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. - To unsubscribe, e-mail:dev-unsubscr...@spark.apache.org For additional commands, e-mail:dev-h...@spark.apache.org . -- CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You
Re: Welcoming three new committers
Congratulations! On Feb 3, 2015, at 2:34 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Hi all, The PMC recently voted to add three new committers: Cheng Lian, Joseph Bradley and Sean Owen. All three have been major contributors to Spark in the past year: Cheng on Spark SQL, Joseph on MLlib, and Sean on ML and many pieces throughout Spark Core. Join me in welcoming them as committers! Matei - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: Setting JVM options to Spark executors in Standalone mode
You can try to add it in in conf/spark-defaults.conf # spark.executor.extraJavaOptions -XX:+PrintGCDetails -Dkey=value -Dnumbers=one two three” Thanks. Zhan Zhang On Jan 16, 2015, at 9:56 AM, Michel Dufresne sparkhealthanalyt...@gmail.com wrote: Hi All, I'm trying to set some JVM options to the executor processes in a standalone cluster. Here's what I have in *spark-env.sh*: jmx_opt=-Dcom.sun.management.jmxremote jmx_opt=${jmx_opt} -Djava.net.preferIPv4Stack=true jmx_opt=${jmx_opt} -Dcom.sun.management.jmxremote.port= jmx_opt=${jmx_opt} -Dcom.sun.management.jmxremote.rmi.port=9998 jmx_opt=${jmx_opt} -Dcom.sun.management.jmxremote.ssl=false jmx_opt=${jmx_opt} -Dcom.sun.management.jmxremote.authenticate=false jmx_opt=${jmx_opt} -Djava.rmi.server.hostname=${SPARK_PUBLIC_DNS} export SPARK_WORKER_OPTS=${jmx_opt} However the option are showing up on the *daemon* JVM not the *workers*. It has the same effect as if I was using SPARK_DAEMON_JAVA_OPTS (which should set it on the daemon process). Thanks in advance for your help, Michel -- CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: Spark-thriftserver Issue
You can try to set it in spark-env.sh. # - SPARK_LOG_DIR Where log files are stored. (Default: ${SPARK_HOME}/logs) # - SPARK_PID_DIR Where the pid file is stored. (Default: /tmp) Thanks. Zhan Zhang On Mar 24, 2015, at 12:10 PM, Anubhav Agarwal anubha...@gmail.commailto:anubha...@gmail.com wrote: Zhan specifying port fixed the port issue. Is it possible to specify the log directory while starting the spark thriftserver? Still getting this error even through the folder exists and everyone has permission to use that directory. drwxr-xr-x 2 root root 4096 Mar 24 19:04 spark-events Exception in thread main java.lang.IllegalArgumentException: Log directory /tmp/spark-events does not exist. at org.apache.spark.scheduler.EventLoggingListener.start(EventLoggingListener.scala:99) at org.apache.spark.SparkContext.init(SparkContext.scala:399) at org.apache.spark.sql.hive.thriftserver.SparkSQLEnv$.init(SparkSQLEnv.scala:49) at org.apache.spark.sql.hive.thriftserver.HiveThriftServer2$.main(HiveThriftServer2.scala:58) at org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.main(HiveThriftServer2.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) On Mon, Mar 23, 2015 at 6:51 PM, Zhan Zhang zzh...@hortonworks.commailto:zzh...@hortonworks.com wrote: Probably the port is already used by others, e.g., hive. You can change the port similar to below ./sbin/start-thriftserver.sh --master yarn --executor-memory 512m --hiveconf hive.server2.thrift.port=10001 Thanks. Zhan Zhang On Mar 23, 2015, at 12:01 PM, Neil Dev neilk...@gmail.commailto:neilk...@gmail.com wrote: Hi, I am having issue starting spark-thriftserver. I'm running spark 1.3.with Hadoop 2.4.0. I would like to be able to change its port too so, I can hive hive-thriftserver as well as spark-thriftserver running at the same time. Starting sparkthrift server:- sudo ./start-thriftserver.sh --master spark://ip-172-31-10-124:7077 --executor-memory 2G Error:- I created the folder manually but still getting the following error Exception in thread main java.lang.IllegalArgumentException: Log directory /tmp/spark-events does not exist. I am getting the following error 15/03/23 15:07:02 ERROR thrift.ThriftCLIService: Error: org.apache.thrift.transport.TTransportException: Could not create ServerSocket on address0.0.0.0/0.0.0.0:1http://0.0.0.0:1/. at org.apache.thrift.transport.TServerSocket.init(TServerSocket.java:93) at org.apache.thrift.transport.TServerSocket.init(TServerSocket.java:79) at org.apache.hive.service.auth.HiveAuthFactory.getServerSocket(HiveAuthFactory.java:236) at org.apache.hive.service.cli.thrift.ThriftBinaryCLIService.run(ThriftBinaryCLIService.java:69) at java.lang.Thread.run(Thread.java:745) Thanks Neil
Re: Review request for SPARK-6112:Provide OffHeap support through HDFS RAM_DISK
Thanks Reynold, Agree with you to open another JIRA to unify the block storage API. I have upload the design doc to SPARK-6479 as well. Thanks. Zhan Zhang On Mar 23, 2015, at 4:03 PM, Reynold Xin r...@databricks.commailto:r...@databricks.com wrote: I created a ticket to separate the API refactoring from the implementation. Would be great to have these as two separate patches to make it easier to review (similar to the way we are doing RPC refactoring -- first introducing an internal RPC api, port akka to it, and then add an alternative implementation). https://issues.apache.org/jira/browse/SPARK-6479 Can you upload your design doc there so we can discuss the block store api? Thanks. On Mon, Mar 23, 2015 at 3:47 PM, Zhan Zhang zzh...@hortonworks.commailto:zzh...@hortonworks.com wrote: Hi Folks, I am planning to implement hdfs off heap support for spark, and have uploaded the design doc for the off heap support through hdfs ramdisk in jira SPARK-6112. Please review it and provide your feedback if anybody are interested. https://issues.apache.org/jira/browse/SPARK-6112 Thanks. Zhan Zhang
Re: Make off-heap store pluggable
Hi Alexey, SPARK-6479https://issues.apache.org/jira/browse/SPARK-6479 is for the plugin API, and SPARK-6112https://issues.apache.org/jira/browse/SPARK-6112 is for hdfs plugin. Thanks. Zhan Zhang On Jul 21, 2015, at 10:56 AM, Alexey Goncharuk alexey.goncha...@gmail.commailto:alexey.goncha...@gmail.com wrote: 2015-07-20 23:29 GMT-07:00 Matei Zaharia matei.zaha...@gmail.commailto:matei.zaha...@gmail.com: I agree with this -- basically, to build on Reynold's point, you should be able to get almost the same performance by implementing either the Hadoop FileSystem API or the Spark Data Source API over Ignite in the right way. This would let people save data persistently in Ignite in addition to using it for caching, and it would provide a global namespace, optionally a schema, etc. You can still provide data locality, short-circuit reads, etc with these APIs. Absolutely agree. In fact, Ignite already provides a shared RDD implementation which is essentially a view of Ignite cache data. This implementation adheres to the Spark DataFrame API. More information can be found here: http://ignite.incubator.apache.org/features/igniterdd.html Also, Ignite in-memory filesystem is compliant with Hadoop filesystem API and can transparently replace HDFS if needed. Plugging it into Spark should be fairly easy. More information can be found here: http://ignite.incubator.apache.org/features/igfs.html --Alexey
Re: Support for views/ virtual tables in SparkSQL
I think you can rewrite those TPC-H queries not using view, for example registerTempTable Thanks. Zhan Zhang On Nov 9, 2015, at 9:34 PM, Sudhir Menon <sme...@pivotal.io> wrote: > Team: > > Do we plan to add support for views/ virtual tables in SparkSQL anytime soon? > Trying to run the TPC-H workload and failing on queries that assumes support > for views in the underlying database > > Thanks in advance > > Suds - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: Proposal for SQL join optimization
Hi Xiao, Performance-wise, without the manual tuning, the query cannot be finished, and with the tuning the query can finish in minutes in TPCH 100G data. I have created https://issues.apache.org/jira/browse/SPARK-11704 and https://issues.apache.org/jira/browse/SPARK-11705 for these two issues, and we can move the discussion there. Thanks. Zhan Zhang On Nov 11, 2015, at 6:16 PM, Xiao Li <gatorsm...@gmail.com<mailto:gatorsm...@gmail.com>> wrote: Hi, Zhan, That sounds really interesting! Please at me when you submit the PR. If possible, please also posted the performance difference. Thanks, Xiao Li 2015-11-11 14:45 GMT-08:00 Zhan Zhang <zzh...@hortonworks.com<mailto:zzh...@hortonworks.com>>: Hi Folks, I did some performance measurement based on TPC-H recently, and want to bring up some performance issue I observed. Both are related to cartesian join. 1. CartesianProduct implementation. Currently CartesianProduct relies on RDD.cartesian, in which the computation is realized as follows override def compute(split: Partition, context: TaskContext): Iterator[(T, U)] = { val currSplit = split.asInstanceOf[CartesianPartition] for (x <- rdd1.iterator(currSplit.s1, context); y <- rdd2.iterator(currSplit.s2, context)) yield (x, y) } >From the above loop, if rdd1.count is n, rdd2 needs to be recomputed n times. >Which is really heavy and may never finished if n is large, especially when >rdd2 is coming from ShuffleRDD. We should have some optimization on CartesianProduct by caching rightResults. The problem is that we don’t have cleanup hook to unpersist rightResults AFAIK. I think we should have some cleanup hook after query execution. With the hook available, we can easily optimize such Cartesian join. I believe such cleanup hook may also benefit other query optimizations. 2. Unnecessary CartesianProduct join. When we have some queries similar to following (don’t remember the exact form): select * from a, b, c, d where a.key1 = c.key1 and b.key2 = c.key2 and c.key3 = d.key3 There will be a cartesian join between a and b. But if we just simply change the table order, for example from a, c, b, d, such cartesian join are eliminated. Without such manual tuning, the query will never finish if a, c are big. But we should not relies on such manual optimization. Please provide your inputs. If they are both valid, I will open liras for each. Thanks. Zhan Zhang - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org<mailto:dev-unsubscr...@spark.apache.org> For additional commands, e-mail: dev-h...@spark.apache.org<mailto:dev-h...@spark.apache.org>
Proposal for SQL join optimization
Hi Folks, I did some performance measurement based on TPC-H recently, and want to bring up some performance issue I observed. Both are related to cartesian join. 1. CartesianProduct implementation. Currently CartesianProduct relies on RDD.cartesian, in which the computation is realized as follows override def compute(split: Partition, context: TaskContext): Iterator[(T, U)] = { val currSplit = split.asInstanceOf[CartesianPartition] for (x <- rdd1.iterator(currSplit.s1, context); y <- rdd2.iterator(currSplit.s2, context)) yield (x, y) } >From the above loop, if rdd1.count is n, rdd2 needs to be recomputed n times. >Which is really heavy and may never finished if n is large, especially when >rdd2 is coming from ShuffleRDD. We should have some optimization on CartesianProduct by caching rightResults. The problem is that we don’t have cleanup hook to unpersist rightResults AFAIK. I think we should have some cleanup hook after query execution. With the hook available, we can easily optimize such Cartesian join. I believe such cleanup hook may also benefit other query optimizations. 2. Unnecessary CartesianProduct join. When we have some queries similar to following (don’t remember the exact form): select * from a, b, c, d where a.key1 = c.key1 and b.key2 = c.key2 and c.key3 = d.key3 There will be a cartesian join between a and b. But if we just simply change the table order, for example from a, c, b, d, such cartesian join are eliminated. Without such manual tuning, the query will never finish if a, c are big. But we should not relies on such manual optimization. Please provide your inputs. If they are both valid, I will open liras for each. Thanks. Zhan Zhang - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: spark-shell 1.5 doesn't seem to work in local mode
It does not matter whether you start your spark with local or other mode. If you have hdfs-site.xml somewhere and spark configuration pointing to that config, you will read/write to HDFS. Thanks. Zhan Zhang From: Madhu <ma...@madhu.com> Sent: Saturday, September 19, 2015 12:14 PM To: dev@spark.apache.org Subject: Re: spark-shell 1.5 doesn't seem to work in local mode Thanks guys. I do have HADOOP_INSTALL set, but Spark 1.4.1 did not seem to mind. Seems like there's a difference in behavior between 1.5.0 and 1.4.1 for some reason. To the best of my knowledge, I just downloaded each tgz and untarred them in /opt I adjusted my PATH to point to one or the other, but that should be about it. Does 1.5.0 pick up HADOOP_INSTALL? Wouldn't spark-shell --master local override that? 1.5 seemed to completely ignore --master local - -- Madhu https://www.linkedin.com/in/msiddalingaiah -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/spark-shell-1-5-doesn-t-seem-to-work-in-local-mode-tp14212p14217.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: Multi-core support per task in Spark
I noticed that it is configurable in job level spark.task.cpus. Anyway to support on task level? Thanks. Zhan Zhang On Dec 11, 2015, at 10:46 AM, Zhan Zhang <zzh...@hortonworks.com> wrote: > Hi Folks, > > Is it possible to assign multiple core per task and how? Suppose we have some > scenario, in which some tasks are really heavy processing each record and > require multi-threading, and we want to avoid similar tasks assigned to the > same executors/hosts. > > If it is not supported, does it make sense to add this feature. It may seems > make user worry about more configuration, but by default we can still do 1 > core per task and only advanced users need to be aware of this feature. > > Thanks. > > Zhan Zhang > > - > To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org > For additional commands, e-mail: dev-h...@spark.apache.org > > - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Multi-core support per task in Spark
Hi Folks, Is it possible to assign multiple core per task and how? Suppose we have some scenario, in which some tasks are really heavy processing each record and require multi-threading, and we want to avoid similar tasks assigned to the same executors/hosts. If it is not supported, does it make sense to add this feature. It may seems make user worry about more configuration, but by default we can still do 1 core per task and only advanced users need to be aware of this feature. Thanks. Zhan Zhang - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Dr.appointment this afternoon and WFH tomorrow for another Dr. appointment (EOM)
- To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: [Spark SQL] SQLContext getOrCreate incorrect behaviour
This looks to me is a very unusual use case. You stop the SparkContext, and start another one. I don’t think it is well supported. As the SparkContext is stopped, all the resources are supposed to be released. Is there any mandatory reason you have stop and restart another SparkContext. Thanks. Zhan Zhang Note that when sc is stopped, all resources are released (for example in yarn On Dec 20, 2015, at 2:59 PM, Jerry Lam <chiling...@gmail.com> wrote: > Hi Spark developers, > > I found that SQLContext.getOrCreate(sc: SparkContext) does not behave > correctly when a different spark context is provided. > > ``` > val sc = new SparkContext > val sqlContext =SQLContext.getOrCreate(sc) > sc.stop > ... > > val sc2 = new SparkContext > val sqlContext2 = SQLContext.getOrCreate(sc2) > sc2.stop > ``` > > The sqlContext2 will reference sc instead of sc2 and therefore, the program > will not work because sc has been stopped. > > Best Regards, > > Jerry - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: ORC file writing hangs in pyspark
Hi James, You can try to write with other format, e.g., parquet to see whether it is a orc specific issue or more generic issue. Thanks. Zhan Zhang On Feb 23, 2016, at 6:05 AM, James Barney <jamesbarne...@gmail.com<mailto:jamesbarne...@gmail.com>> wrote: I'm trying to write an ORC file after running the FPGrowth algorithm on a dataset of around just 2GB in size. The algorithm performs well and can display results if I take(n) the freqItemSets() of the result after converting that to a DF. I'm using Spark 1.5.2 on HDP 2.3.4 and Python 3.4.2 on Yarn. I get the results from querying a Hive table, also ORC format, running a number of maps, joins, and filters on the data. When the program attempts to write the files: result.write.orc('/data/staged/raw_result') size_1_buckets.write.orc('/data/staged/size_1_results') filter_size_2_buckets.write.orc('/data/staged/size_2_results') The first path, /data/staged/raw_result, is created with a _temporary folder, but the data is never written. The job hangs at this point, apparently indefinitely. Additionally, no logs are recorded or available for the jobs on the history server. What could be the problem?
Re: RFC: Remote "HBaseTest" from examples?
FYI: There are several pending patches for DataFrame support on top of HBase. Thanks. Zhan Zhang On Apr 20, 2016, at 2:43 AM, Saisai Shao <sai.sai.s...@gmail.com<mailto:sai.sai.s...@gmail.com>> wrote: +1, HBaseTest in Spark Example is quite old and obsolete, the HBase connector in HBase repo has evolved a lot, it would be better to guide user to refer to that not here in Spark example. So good to remove it. Thanks Saisai On Wed, Apr 20, 2016 at 1:41 AM, Josh Rosen <joshro...@databricks.com<mailto:joshro...@databricks.com>> wrote: +1; I think that it's preferable for code examples, especially third-party integration examples, to live outside of Spark. On Tue, Apr 19, 2016 at 10:29 AM Reynold Xin <r...@databricks.com<mailto:r...@databricks.com>> wrote: Yea in general I feel examples that bring in a large amount of dependencies should be outside Spark. On Tue, Apr 19, 2016 at 10:15 AM, Marcelo Vanzin <van...@cloudera.com<mailto:van...@cloudera.com>> wrote: Hey all, Two reasons why I think we should remove that from the examples: - HBase now has Spark integration in its own repo, so that really should be the template for how to use HBase from Spark, making that example less useful, even misleading. - It brings up a lot of extra dependencies that make the size of the Spark distribution grow. Any reason why we shouldn't drop that example? -- Marcelo - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org<mailto:dev-unsubscr...@spark.apache.org> For additional commands, e-mail: dev-h...@spark.apache.org<mailto:dev-h...@spark.apache.org>
Re: [GRAPHX] Graph Algorithms and Spark
You can take a look at this blog from data bricks about GraphFrames https://databricks.com/blog/2016/03/03/introducing-graphframes.html Thanks. Zhan Zhang On Apr 21, 2016, at 12:53 PM, Robin East <robin.e...@xense.co.uk<mailto:robin.e...@xense.co.uk>> wrote: Hi Aside from LDA, which is implemented in MLLib, GraphX has the following built-in algorithms: * PageRank/Personalised PageRank * Connected Components * Strongly Connected Components * Triangle Count * Shortest Paths * Label Propagation It also implements a version of Pregel framework, a form of bulk-synchronous parallel processing that is the foundation of most of the above algorithms. We cover other algorithms in our book and if you search on google you will find a number of other examples. --- Robin East Spark GraphX in Action Michael Malak and Robin East Manning Publications Co. http://www.manning.com/books/spark-graphx-in-action On 21 Apr 2016, at 19:47, tgensol <thibaut.gensol...@gmail.com<mailto:thibaut.gensol...@gmail.com>> wrote: Hi there, I am working in a group of the University of Michigan, and we are trying to make (and find first) some Distributed graph algorithms. I know spark, and I found GraphX. I read the docs, but I only found Latent Dirichlet Allocation algorithms working with GraphX, so I was wondering why ? Basically, the groupe wants to implement Minimal Spanning Tree, kNN, shortest path at first. So my askings are : Is graphX enough stable for developing this kind of algorithms on it ? Do you know some algorithms like these working on top of GraphX ? And if no, why do you think, nobody tried to do it ? Is this too hard ? Or just because nobody needs it ? Maybe, it is only my knowledge about GraphX which is weak, and it is not possible to make these algorithms with GraphX. Thanking you in advance, Best regards, Thibaut -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/GRAPHX-Graph-Algorithms-and-Spark-tp17301.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com<http://nabble.com/>. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org<mailto:dev-unsubscr...@spark.apache.org> For additional commands, e-mail: dev-h...@spark.apache.org<mailto:dev-h...@spark.apache.org>
Re: right outer joins on Datasets
The reason for "-1" is that the default value for Integer is -1 if the value is null def defaultValue(jt: String): String = jt match { ... case JAVA_INT => "-1" ... } -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/right-outer-joins-on-Datasets-tp17542p17651.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: more uniform exception handling?
+1 Both of the would be very helpful in debugging Thanks. Zhan Zhang On Apr 18, 2016, at 1:18 PM, Evan Chan <velvia.git...@gmail.com> wrote: > +1000. > > Especially if the UI can help correlate exceptions, and we can reduce > some exceptions. > > There are some exceptions which are in practice very common, such as > the nasty ClassNotFoundException, that most folks end up spending tons > of time debugging. > > > On Mon, Apr 18, 2016 at 12:16 PM, Reynold Xin <r...@databricks.com> wrote: >> Josh's pull request on rpc exception handling got me to think ... >> >> In my experience, there have been a few things related exceptions that >> created a lot of trouble for us in production debugging: >> >> 1. Some exception is thrown, but is caught by some try/catch that does not >> do any logging nor rethrow. >> 2. Some exception is thrown, but is caught by some try/catch that does not >> do any logging, but do rethrow. But the original exception is now masked. >> 2. Multiple exceptions are logged at different places close to each other, >> but we don't know whether they are caused by the same problem or not. >> >> >> To mitigate some of the above, here's an idea ... >> >> (1) Create a common root class for all the exceptions (e.g. call it >> SparkException) used in Spark. We should make sure every time we catch an >> exception from a 3rd party library, we rethrow them as SparkException (a lot >> of places already do that). In SparkException's constructor, log the >> exception and the stacktrace. >> >> (2) SparkException has a monotonically increasing ID, and this ID appears in >> the exception error message (say at the end). >> >> >> I think (1) will eliminate most of the cases that an exception gets >> swallowed. The main downside I can think of is we might log an exception >> multiple times. However, I'd argue exceptions should be rare, and it is not >> that big of a deal to log them twice or three times. The unique ID (2) can >> help us correlate exceptions if they appear multiple times. >> >> Thoughts? >> >> >> >> >> > > - > To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org > For additional commands, e-mail: dev-h...@spark.apache.org > > - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: SparkSQL - Limit pushdown on BroadcastHashJoin
>From the physical plan, the limit is one level up than the WholeStageCodegen, >Thus, I don’t think shouldStop would work here. To move it work, the limit has >to be part of the wholeStageCodeGen. Correct me if I am wrong. Thanks. Zhan Zhang On Apr 18, 2016, at 11:09 AM, Reynold Xin <r...@databricks.com<mailto:r...@databricks.com>> wrote: I could be wrong but I think we currently do that through whole stage codegen. After processing every row on the stream side, the generated code for broadcast join checks whether it has hit the limit or not (through this thing called shouldStop). It is not the most optimal solution, because a single stream side row might output multiple hits, but it is usually not a problem. On Mon, Apr 18, 2016 at 10:46 AM, Andrew Ray <ray.and...@gmail.com<mailto:ray.and...@gmail.com>> wrote: While you can't automatically push the limit *through* the join, we could push it *into* the join (stop processing after generating 10 records). I believe that is what Rajesh is suggesting. On Tue, Apr 12, 2016 at 7:46 AM, Herman van Hövell tot Westerflier <hvanhov...@questtec.nl<mailto:hvanhov...@questtec.nl>> wrote: I am not sure if you can push a limit through a join. This becomes problematic if not all keys are present on both sides; in such a case a limit can produce fewer rows than the set limit. This might be a rare case in which whole stage codegen is slower, due to the fact that we need to buffer the result of such a stage. You could try to disable it by setting "spark.sql.codegen.wholeStage" to false. 2016-04-12 14:32 GMT+02:00 Rajesh Balamohan <rajesh.balamo...@gmail.com<mailto:rajesh.balamo...@gmail.com>>: Hi, I ran the following query in spark (latest master codebase) and it took a lot of time to complete even though it was a broadcast hash join. It appears that limit computation is done only after computing complete join condition. Shouldn't the limit condition be pushed to BroadcastHashJoin (wherein it would have to stop processing after generating 10 rows?). Please let me know if my understanding on this is wrong. select l_partkey from lineitem, partsupp where ps_partkey=l_partkey limit 10; >>>> | == Physical Plan == CollectLimit 10 +- WholeStageCodegen : +- Project [l_partkey#893] : +- BroadcastHashJoin [l_partkey#893], [ps_partkey#908], Inner, BuildRight, None ::- Project [l_partkey#893] :: +- Filter isnotnull(l_partkey#893) :: +- Scan HadoopFiles[l_partkey#893] Format: ORC, PushedFilters: [IsNotNull(l_partkey)], ReadSchema: struct :+- INPUT +- BroadcastExchange HashedRelationBroadcastMode(true,List(cast(ps_partkey#908 as bigint)),List(ps_partkey#908)) +- WholeStageCodegen : +- Project [ps_partkey#908] : +- Filter isnotnull(ps_partkey#908) :+- Scan HadoopFiles[ps_partkey#908] Format: ORC, PushedFilters: [IsNotNull(ps_partkey)], ReadSchema: struct | >>>> -- ~Rajesh.B
Re: SparkSQL - Limit pushdown on BroadcastHashJoin
Hi Reynold, I just check the code for CollectLimit, there is a shuffle happening to collect them in one partition. protected override def doExecute(): RDD[InternalRow] = { val shuffled = new ShuffledRowRDD( ShuffleExchange.prepareShuffleDependency( child.execute(), child.output, SinglePartition, serializer)) shuffled.mapPartitionsInternal(_.take(limit)) } Thus, there is no way to avoid processing all data before the shuffle. I think that is the reason. Do I understand correctly? Thanks. Zhan Zhang On Apr 18, 2016, at 10:08 PM, Reynold Xin <r...@databricks.com<mailto:r...@databricks.com>> wrote: Unless I'm really missing something I don't think so. As I said, it goes through an iterator and after processing each stream side we do a shouldStop check. The generated code looks like /* 094 */ protected void processNext() throws java.io.IOException { /* 095 */ /*** PRODUCE: Project [id#79L] */ /* 096 */ /* 097 */ /*** PRODUCE: BroadcastHashJoin [id#79L], [id#82L], Inner, BuildRight, None */ /* 098 */ /* 099 */ /*** PRODUCE: Range 0, 1, 8, 100, [id#79L] */ /* 100 */ /* 101 */ // initialize Range /* 102 */ if (!range_initRange) { /* 103 */ range_initRange = true; /* 104 */ initRange(partitionIndex); /* 105 */ } /* 106 */ /* 107 */ while (!range_overflow && range_number < range_partitionEnd) { /* 108 */ long range_value = range_number; /* 109 */ range_number += 1L; /* 110 */ if (range_number < range_value ^ 1L < 0) { /* 111 */ range_overflow = true; /* 112 */ } /* 113 */ /* 114 */ /*** CONSUME: BroadcastHashJoin [id#79L], [id#82L], Inner, BuildRight, None */ /* 115 */ /* 116 */ // generate join key for stream side /* 117 */ /* 118 */ // find matches from HashedRelation /* 119 */ UnsafeRow bhj_matched = false ? null: (UnsafeRow)bhj_relation.getValue(range_value); /* 120 */ if (bhj_matched == null) continue; /* 121 */ /* 122 */ bhj_metricValue.add(1); /* 123 */ /* 124 */ /*** CONSUME: Project [id#79L] */ /* 125 */ /* 126 */ System.out.println("i got one row"); /* 127 */ /* 128 */ /*** CONSUME: WholeStageCodegen */ /* 129 */ /* 130 */ project_rowWriter.write(0, range_value); /* 131 */ append(project_result); /* 132 */ /* 133 */ if (shouldStop()) return; /* 134 */ } /* 135 */ } /* 136 */ } shouldStop is false once we go pass the limit. On Mon, Apr 18, 2016 at 9:44 PM, Zhan Zhang <zzh...@hortonworks.com<mailto:zzh...@hortonworks.com>> wrote: >From the physical plan, the limit is one level up than the WholeStageCodegen, >Thus, I don’t think shouldStop would work here. To move it work, the limit has >to be part of the wholeStageCodeGen. Correct me if I am wrong. Thanks. Zhan Zhang On Apr 18, 2016, at 11:09 AM, Reynold Xin <r...@databricks.com<mailto:r...@databricks.com>> wrote: I could be wrong but I think we currently do that through whole stage codegen. After processing every row on the stream side, the generated code for broadcast join checks whether it has hit the limit or not (through this thing called shouldStop). It is not the most optimal solution, because a single stream side row might output multiple hits, but it is usually not a problem. On Mon, Apr 18, 2016 at 10:46 AM, Andrew Ray <ray.and...@gmail.com<mailto:ray.and...@gmail.com>> wrote: While you can't automatically push the limit *through* the join, we could push it *into* the join (stop processing after generating 10 records). I believe that is what Rajesh is suggesting. On Tue, Apr 12, 2016 at 7:46 AM, Herman van Hövell tot Westerflier <hvanhov...@questtec.nl<mailto:hvanhov...@questtec.nl>> wrote: I am not sure if you can push a limit through a join. This becomes problematic if not all keys are present on both sides; in such a case a limit can produce fewer rows than the set limit. This might be a rare case in which whole stage codegen is slower, due to the fact that we need to buffer the result of such a stage. You could try to disable it by setting "spark.sql.codegen.wholeStage" to false. 2016-04-12 14:32 GMT+02:00 Rajesh Balamohan <rajesh.balamo...@gmail.com<mailto:rajesh.balamo...@gmail.com>>: Hi, I ran the following query in spark (latest master codebase) and it took a lot of time to complete even though it was a broadcast hash join. It appears that limit computation is done only after computing complete join condition. Shouldn't the limit condition be pushed to BroadcastHashJoin (wherein it would have to stop processing after generating 10 rows?). Please let me know if my understanding on this is wrong. select l_partkey from lineitem, partsupp where ps_partkey=l_partkey limit 10; >>>> | == Physical Plan == CollectLimit 10 +- WholeStageCodegen : +- Project [l_partkey#893] : +- BroadcastHa
Re: SparkSQL - Limit pushdown on BroadcastHashJoin
Thanks Reynold. Not sure why doExecute is not invoked, since CollectLimit does not support wholeStage case class CollectLimit(limit: Int, child: SparkPlan) extends UnaryNode { I will dig further into this. Zhan Zhang On Apr 18, 2016, at 10:36 PM, Reynold Xin <r...@databricks.com<mailto:r...@databricks.com>> wrote: Anyway we can verify this easily. I just added a println to each row and verified that only limit + 1 row was printed after the join and before the limit. It'd be great if you do some debugging yourself and see if it is going through some other code path. On Mon, Apr 18, 2016 at 10:35 PM, Reynold Xin <r...@databricks.com<mailto:r...@databricks.com>> wrote: But doExecute is not called? On Mon, Apr 18, 2016 at 10:32 PM, Zhan Zhang <zzh...@hortonworks.com<mailto:zzh...@hortonworks.com>> wrote: Hi Reynold, I just check the code for CollectLimit, there is a shuffle happening to collect them in one partition. protected override def doExecute(): RDD[InternalRow] = { val shuffled = new ShuffledRowRDD( ShuffleExchange.prepareShuffleDependency( child.execute(), child.output, SinglePartition, serializer)) shuffled.mapPartitionsInternal(_.take(limit)) } Thus, there is no way to avoid processing all data before the shuffle. I think that is the reason. Do I understand correctly? Thanks. Zhan Zhang On Apr 18, 2016, at 10:08 PM, Reynold Xin <r...@databricks.com<mailto:r...@databricks.com>> wrote: Unless I'm really missing something I don't think so. As I said, it goes through an iterator and after processing each stream side we do a shouldStop check. The generated code looks like /* 094 */ protected void processNext() throws java.io.IOException { /* 095 */ /*** PRODUCE: Project [id#79L] */ /* 096 */ /* 097 */ /*** PRODUCE: BroadcastHashJoin [id#79L], [id#82L], Inner, BuildRight, None */ /* 098 */ /* 099 */ /*** PRODUCE: Range 0, 1, 8, 100, [id#79L] */ /* 100 */ /* 101 */ // initialize Range /* 102 */ if (!range_initRange) { /* 103 */ range_initRange = true; /* 104 */ initRange(partitionIndex); /* 105 */ } /* 106 */ /* 107 */ while (!range_overflow && range_number < range_partitionEnd) { /* 108 */ long range_value = range_number; /* 109 */ range_number += 1L; /* 110 */ if (range_number < range_value ^ 1L < 0) { /* 111 */ range_overflow = true; /* 112 */ } /* 113 */ /* 114 */ /*** CONSUME: BroadcastHashJoin [id#79L], [id#82L], Inner, BuildRight, None */ /* 115 */ /* 116 */ // generate join key for stream side /* 117 */ /* 118 */ // find matches from HashedRelation /* 119 */ UnsafeRow bhj_matched = false ? null: (UnsafeRow)bhj_relation.getValue(range_value); /* 120 */ if (bhj_matched == null) continue; /* 121 */ /* 122 */ bhj_metricValue.add(1); /* 123 */ /* 124 */ /*** CONSUME: Project [id#79L] */ /* 125 */ /* 126 */ System.out.println("i got one row"); /* 127 */ /* 128 */ /*** CONSUME: WholeStageCodegen */ /* 129 */ /* 130 */ project_rowWriter.write(0, range_value); /* 131 */ append(project_result); /* 132 */ /* 133 */ if (shouldStop()) return; /* 134 */ } /* 135 */ } /* 136 */ } shouldStop is false once we go pass the limit. On Mon, Apr 18, 2016 at 9:44 PM, Zhan Zhang <zzh...@hortonworks.com<mailto:zzh...@hortonworks.com>> wrote: >From the physical plan, the limit is one level up than the WholeStageCodegen, >Thus, I don’t think shouldStop would work here. To move it work, the limit has >to be part of the wholeStageCodeGen. Correct me if I am wrong. Thanks. Zhan Zhang On Apr 18, 2016, at 11:09 AM, Reynold Xin <r...@databricks.com<mailto:r...@databricks.com>> wrote: I could be wrong but I think we currently do that through whole stage codegen. After processing every row on the stream side, the generated code for broadcast join checks whether it has hit the limit or not (through this thing called shouldStop). It is not the most optimal solution, because a single stream side row might output multiple hits, but it is usually not a problem. On Mon, Apr 18, 2016 at 10:46 AM, Andrew Ray <ray.and...@gmail.com<mailto:ray.and...@gmail.com>> wrote: While you can't automatically push the limit *through* the join, we could push it *into* the join (stop processing after generating 10 records). I believe that is what Rajesh is suggesting. On Tue, Apr 12, 2016 at 7:46 AM, Herman van Hövell tot Westerflier <hvanhov...@questtec.nl<mailto:hvanhov...@questtec.nl>> wrote: I am not sure if you can push a limit through a join. This becomes problematic if not all keys are present on both sides; in such a case a limit can produce fewer rows than the set limit. This might be a rare case in which whole stage codegen is slower, due to the fact that we need to buffer the result
Anyone knows the hive repo for spark-2.0?
I saw the pom file having hive version as 1.2.1.spark2. But I cannot find the branch in https://github.com/pwendell/ Does anyone know where the repo is? Thanks. Zhan Zhang -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Anyone-knows-the-hive-repo-for-spark-2-0-tp18234.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. - To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
SparkPlan/Shuffle stage reuse with Dataset/DataFrame
Hi Folks, We have some Dataset/Dataframe use cases that will benefit from reuse the SparkPlan and shuffle stage. For example, the following cases. Because the query optimization and sparkplan is generated by catalyst when it is executed, as a result, the underlying RDD lineage is regenerated for dataset1. Thus, the shuffle stage will be executed multiple times. val dataset1 = dataset.groupby.agg df.registerTempTable("tmpTable") spark.sql("select * from tmpTable where condition").collect spark.sql("select * from tmpTable where condition1").cllect On the one side, we get optimized query plan, but on the other side, we cannot reuse the data generated by shuffle stage. Currently, to reuse the dataset1, we have to use persist to cache the data. It is helpful but sometimes is not what we want, as it has some side effect. For example, we cannot release the executor that has active cache in it even it is idle and dynamic allocator is enabled. In other words, we only want to reuse the shuffle data as much as possible without caching in a long pipeline with multiple shuffle stages. I am wondering does it make sense to add a new feature to Dataset/Dataframe to work as barrier and prevent the query optimization happens across the barrier. For example, in the above case, we want catalyst take tmpTable as a barrier, and stop optimization across it, so that we can reuse the underlying rdd lineage of dataset1. The prototype code to make it work is quite small, and we tried in house with a new API as Dataset.cacheShuffle to make this happen. But I want some feedback from community before opening a JIRA, as in some sense, it does stop the optimization earlier. Any comments? -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/SparkPlan-Shuffle-stage-reuse-with-Dataset-DataFrame-tp19502.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. - To unsubscribe e-mail: dev-unsubscr...@spark.apache.org